/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.server;

import io.reactivesocket.ClientReactiveSocket;
import io.reactivesocket.ConnectionSetupPayload;
import io.reactivesocket.DuplexConnection;
import io.reactivesocket.FrameType;
import io.reactivesocket.ServerReactiveSocket;
import io.reactivesocket.StreamIdSupplier;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.events.AbstractEventSource;
import io.reactivesocket.events.ConnectionEventInterceptor;
import io.reactivesocket.events.ServerEventListener;
import io.reactivesocket.internal.ClientServerInputMultiplexer;
import io.reactivesocket.lease.DefaultLeaseHonoringSocket;
import io.reactivesocket.lease.LeaseEnforcingSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import io.reactivesocket.server.ReactiveSocketServer;
import io.reactivesocket.transport.TransportServer;
import io.reactivesocket.util.Clock;
import org.reactivestreams.Subscriber;

public final class DefaultReactiveSocketServer
extends AbstractEventSource<ServerEventListener>
implements ReactiveSocketServer {
    private final TransportServer transportServer;

    public DefaultReactiveSocketServer(TransportServer transportServer) {
        this.transportServer = transportServer;
    }

    @Override
    public TransportServer.StartedServer start(ReactiveSocketServer.SocketAcceptor acceptor) {
        return this.transportServer.start((DuplexConnection connection) -> {
            DuplexConnection dc;
            if (this.isEventPublishingEnabled()) {
                long startTime = Clock.now();
                dc = new ConnectionEventInterceptor(connection, this);
                ((ServerEventListener)this.getEventListener()).socketAccepted();
                dc.onClose().subscribe((Subscriber)Subscribers.doOnTerminate(() -> {
                    if (this.isEventPublishingEnabled()) {
                        ((ServerEventListener)this.getEventListener()).socketClosed(Clock.elapsedSince(startTime), Clock.unit());
                    }
                }));
            } else {
                dc = connection;
            }
            return Px.from(dc.receive()).switchTo(setupFrame -> {
                if (setupFrame.getType() == FrameType.SETUP) {
                    ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(dc);
                    ConnectionSetupPayload setup = ConnectionSetupPayload.create(setupFrame);
                    ClientReactiveSocket sender = new ClientReactiveSocket(multiplexer.asServerConnection(), Throwable::printStackTrace, StreamIdSupplier.serverSupplier(), KeepAliveProvider.never(), this);
                    DefaultLeaseHonoringSocket lhs = new DefaultLeaseHonoringSocket(sender);
                    sender.start(lhs);
                    LeaseEnforcingSocket handler = acceptor.accept(setup, sender);
                    ServerReactiveSocket receiver = new ServerReactiveSocket(multiplexer.asClientConnection(), handler, setup.willClientHonorLease(), Throwable::printStackTrace, this);
                    receiver.start();
                    return dc.onClose();
                }
                return Px.error((Throwable)new IllegalStateException("Invalid first frame on the connection: " + dc + ", frame type received: " + (Object)((Object)setupFrame.getType())));
            });
        });
    }
}

