/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.client;

import io.reactivesocket.ClientReactiveSocket;
import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.FrameType;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.ServerReactiveSocket;
import io.reactivesocket.StreamIdSupplier;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.client.SetupProvider;
import io.reactivesocket.events.AbstractEventSource;
import io.reactivesocket.events.ClientEventListener;
import io.reactivesocket.events.ConnectionEventInterceptor;
import io.reactivesocket.internal.ClientServerInputMultiplexer;
import io.reactivesocket.internal.DisabledEventPublisher;
import io.reactivesocket.internal.EventPublisher;
import io.reactivesocket.lease.DisableLeaseSocket;
import io.reactivesocket.lease.LeaseEnforcingSocket;
import io.reactivesocket.lease.LeaseHonoringSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.publishers.InstrumentingPublisher;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import io.reactivesocket.util.Clock;
import io.reactivesocket.util.PayloadImpl;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

final class SetupProviderImpl
extends AbstractEventSource<ClientEventListener>
implements SetupProvider {
    private final Frame setupFrame;
    private final Function<ReactiveSocket, ? extends LeaseHonoringSocket> leaseDecorator;
    private final Consumer<Throwable> errorConsumer;
    private final KeepAliveProvider keepAliveProvider;

    SetupProviderImpl(Frame setupFrame, Function<ReactiveSocket, ? extends LeaseHonoringSocket> leaseDecorator, KeepAliveProvider keepAliveProvider, Consumer<Throwable> errorConsumer) {
        this.keepAliveProvider = keepAliveProvider;
        this.errorConsumer = errorConsumer;
        Frame.ensureFrameType(FrameType.SETUP, setupFrame);
        this.leaseDecorator = leaseDecorator;
        this.setupFrame = setupFrame;
    }

    @Override
    public Publisher<ReactiveSocket> accept(DuplexConnection connection, ReactiveSocketClient.SocketAcceptor acceptor) {
        DuplexConnection dc = this.isEventPublishingEnabled() ? new ConnectionEventInterceptor(connection, this) : connection;
        Publisher<ReactiveSocket> source = this._setup(dc, acceptor);
        return new InstrumentingPublisher(source, subscriber -> {
            if (!this.isEventPublishingEnabled()) {
                return ConnectInspector.empty;
            }
            return new ConnectInspector(this);
        }, ConnectInspector::connectFailed, null, ConnectInspector::connectCancelled, ConnectInspector::connectSuccess);
    }

    @Override
    public SetupProvider dataMimeType(String dataMimeType) {
        Frame newSetup = Frame.Setup.from(Frame.Setup.getFlags(this.setupFrame), Frame.Setup.keepaliveInterval(this.setupFrame), Frame.Setup.maxLifetime(this.setupFrame), Frame.Setup.metadataMimeType(this.setupFrame), dataMimeType, this.setupFrame);
        return new SetupProviderImpl(newSetup, this.leaseDecorator, this.keepAliveProvider, this.errorConsumer);
    }

    @Override
    public SetupProvider metadataMimeType(String metadataMimeType) {
        Frame newSetup = Frame.Setup.from(Frame.Setup.getFlags(this.setupFrame), Frame.Setup.keepaliveInterval(this.setupFrame), Frame.Setup.maxLifetime(this.setupFrame), metadataMimeType, Frame.Setup.dataMimeType(this.setupFrame), this.setupFrame);
        return new SetupProviderImpl(newSetup, this.leaseDecorator, this.keepAliveProvider, this.errorConsumer);
    }

    @Override
    public SetupProvider honorLease(Function<ReactiveSocket, LeaseHonoringSocket> leaseDecorator) {
        return new SetupProviderImpl(this.setupFrame, leaseDecorator, this.keepAliveProvider, this.errorConsumer);
    }

    @Override
    public SetupProvider disableLease() {
        return this.disableLease(DisableLeaseSocket::new);
    }

    @Override
    public SetupProvider disableLease(Function<ReactiveSocket, DisableLeaseSocket> socketFactory) {
        Frame newSetup = Frame.Setup.from(Frame.Setup.getFlags(this.setupFrame) & 0xFFFFFFDF, Frame.Setup.keepaliveInterval(this.setupFrame), Frame.Setup.maxLifetime(this.setupFrame), Frame.Setup.metadataMimeType(this.setupFrame), Frame.Setup.dataMimeType(this.setupFrame), this.setupFrame);
        return new SetupProviderImpl(newSetup, socketFactory, this.keepAliveProvider, this.errorConsumer);
    }

    @Override
    public SetupProvider setupPayload(Payload setupPayload) {
        Frame newSetup = Frame.Setup.from(Frame.Setup.getFlags(this.setupFrame) & 0xFFFFFFDF, Frame.Setup.keepaliveInterval(this.setupFrame), Frame.Setup.maxLifetime(this.setupFrame), Frame.Setup.metadataMimeType(this.setupFrame), Frame.Setup.dataMimeType(this.setupFrame), setupPayload);
        return new SetupProviderImpl(newSetup, reactiveSocket -> new DisableLeaseSocket((ReactiveSocket)reactiveSocket), this.keepAliveProvider, this.errorConsumer);
    }

    private Frame copySetupFrame() {
        Frame newSetup = Frame.Setup.from(Frame.Setup.getFlags(this.setupFrame), Frame.Setup.keepaliveInterval(this.setupFrame), Frame.Setup.maxLifetime(this.setupFrame), Frame.Setup.metadataMimeType(this.setupFrame), Frame.Setup.dataMimeType(this.setupFrame), new PayloadImpl(this.setupFrame.getData().duplicate(), this.setupFrame.getMetadata().duplicate()));
        return newSetup;
    }

    private Publisher<ReactiveSocket> _setup(DuplexConnection connection, ReactiveSocketClient.SocketAcceptor acceptor) {
        return Px.from(connection.sendOne(this.copySetupFrame())).cast(ReactiveSocket.class).concatWith((Publisher)Px.defer(() -> {
            ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(connection);
            ClientReactiveSocket sendingSocket = new ClientReactiveSocket(multiplexer.asClientConnection(), this.errorConsumer, StreamIdSupplier.clientSupplier(), this.keepAliveProvider, this);
            LeaseHonoringSocket leaseHonoringSocket = this.leaseDecorator.apply(sendingSocket);
            sendingSocket.start(leaseHonoringSocket);
            LeaseEnforcingSocket acceptingSocket = acceptor.accept(sendingSocket);
            ServerReactiveSocket receivingSocket = new ServerReactiveSocket(multiplexer.asServerConnection(), acceptingSocket, true, this.errorConsumer, this);
            receivingSocket.start();
            return Px.just((Object)leaseHonoringSocket);
        }));
    }

    private static class ConnectInspector {
        private static final ConnectInspector empty = new ConnectInspector(new DisabledEventPublisher<ClientEventListener>());
        private final EventPublisher<ClientEventListener> publisher;
        private final long startTime;

        public ConnectInspector(EventPublisher<ClientEventListener> publisher) {
            this.publisher = publisher;
            this.startTime = Clock.now();
            if (publisher.isEventPublishingEnabled()) {
                publisher.getEventListener().connectStart();
            }
        }

        public void connectSuccess(ReactiveSocket socket) {
            if (this.publisher.isEventPublishingEnabled()) {
                this.publisher.getEventListener().connectCompleted(() -> socket.availability(), System.nanoTime() - this.startTime, TimeUnit.NANOSECONDS);
                socket.onClose().subscribe((Subscriber)Subscribers.doOnTerminate(() -> {
                    if (this.publisher.isEventPublishingEnabled()) {
                        this.publisher.getEventListener().socketClosed(Clock.elapsedSince(this.startTime), Clock.unit());
                    }
                }));
            }
        }

        public void connectFailed(Throwable cause) {
            if (this.publisher.isEventPublishingEnabled()) {
                this.publisher.getEventListener().connectFailed(System.nanoTime() - this.startTime, TimeUnit.NANOSECONDS, cause);
            }
        }

        public void connectCancelled() {
            if (this.publisher.isEventPublishingEnabled()) {
                this.publisher.getEventListener().connectCancelled(System.nanoTime() - this.startTime, TimeUnit.NANOSECONDS);
            }
        }
    }
}

