/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.FrameType;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.events.EventListener;
import io.reactivesocket.events.EventPublishingSocket;
import io.reactivesocket.events.EventPublishingSocketImpl;
import io.reactivesocket.exceptions.ApplicationException;
import io.reactivesocket.exceptions.SetupException;
import io.reactivesocket.internal.DisabledEventPublisher;
import io.reactivesocket.internal.EventPublisher;
import io.reactivesocket.internal.KnownErrorFilter;
import io.reactivesocket.internal.RemoteReceiver;
import io.reactivesocket.internal.RemoteSender;
import io.reactivesocket.lease.LeaseEnforcingSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import io.reactivesocket.util.Clock;
import java.util.Collection;
import java.util.function.Consumer;
import org.agrona.collections.Int2ObjectHashMap;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ServerReactiveSocket
implements ReactiveSocket {
    private final DuplexConnection connection;
    private final Publisher<Frame> serverInput;
    private final Consumer<Throwable> errorConsumer;
    private final EventPublisher<? extends EventListener> eventPublisher;
    private final Int2ObjectHashMap<Subscription> subscriptions;
    private final Int2ObjectHashMap<RemoteReceiver> channelProcessors;
    private final ReactiveSocket requestHandler;
    private Subscription receiversSubscription;
    private final EventPublishingSocket eventPublishingSocket;

    public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler, boolean clientHonorsLease, Consumer<Throwable> errorConsumer, EventPublisher<? extends EventListener> eventPublisher) {
        this.requestHandler = requestHandler;
        this.connection = connection;
        this.serverInput = connection.receive();
        this.errorConsumer = new KnownErrorFilter(errorConsumer);
        this.eventPublisher = eventPublisher;
        this.subscriptions = new Int2ObjectHashMap();
        this.channelProcessors = new Int2ObjectHashMap();
        this.eventPublishingSocket = eventPublisher.isEventPublishingEnabled() ? new EventPublishingSocketImpl(eventPublisher, false) : EventPublishingSocket.DISABLED;
        Px.from(connection.onClose()).subscribe((Subscriber)Subscribers.cleanup(() -> this.cleanup()));
        if (requestHandler instanceof LeaseEnforcingSocket) {
            LeaseEnforcingSocket enforcer = (LeaseEnforcingSocket)requestHandler;
            enforcer.acceptLeaseSender(lease -> {
                if (!clientHonorsLease) {
                    return;
                }
                Frame leaseFrame = Frame.Lease.from(lease.getTtl(), lease.getAllowedRequests(), lease.metadata());
                Px.from(connection.sendOne(leaseFrame)).doOnError(errorConsumer).subscribe();
            });
        }
    }

    public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler, boolean clientHonorsLease, Consumer<Throwable> errorConsumer) {
        this(connection, requestHandler, clientHonorsLease, errorConsumer, new DisabledEventPublisher());
    }

    public ServerReactiveSocket(DuplexConnection connection, ReactiveSocket requestHandler, Consumer<Throwable> errorConsumer) {
        this(connection, requestHandler, true, errorConsumer);
    }

    @Override
    public Publisher<Void> fireAndForget(Payload payload) {
        return this.requestHandler.fireAndForget(payload);
    }

    @Override
    public Publisher<Payload> requestResponse(Payload payload) {
        return this.requestHandler.requestResponse(payload);
    }

    @Override
    public Publisher<Payload> requestStream(Payload payload) {
        return this.requestHandler.requestStream(payload);
    }

    @Override
    public Publisher<Payload> requestSubscription(Payload payload) {
        return this.requestHandler.requestSubscription(payload);
    }

    @Override
    public Publisher<Payload> requestChannel(Publisher<Payload> payloads) {
        return this.requestHandler.requestChannel(payloads);
    }

    @Override
    public Publisher<Void> metadataPush(Payload payload) {
        return this.requestHandler.metadataPush(payload);
    }

    @Override
    public Publisher<Void> close() {
        return Px.concatEmpty((Publisher)Px.defer(() -> {
            this.cleanup();
            return Px.empty();
        }), this.connection.close());
    }

    @Override
    public Publisher<Void> onClose() {
        return this.connection.onClose();
    }

    public ServerReactiveSocket start() {
        Px.from(this.serverInput).doOnNext(frame -> this.handleFrame((Frame)frame).subscribe((Subscriber)Subscribers.doOnError(this.errorConsumer))).doOnError(t -> {
            Collection values;
            this.errorConsumer.accept((Throwable)t);
            ServerReactiveSocket serverReactiveSocket = this;
            synchronized (serverReactiveSocket) {
                values = this.subscriptions.values();
            }
            values.forEach(Subscription::cancel);
        }).doOnSubscribe(subscription -> {
            this.receiversSubscription = new Subscription((Subscription)subscription){
                final /* synthetic */ Subscription val$subscription;
                {
                    this.val$subscription = subscription;
                }

                public void request(long n) {
                    this.val$subscription.request(n);
                }

                public void cancel() {
                    this.val$subscription.cancel();
                }
            };
        }).subscribe();
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Publisher<Void> handleFrame(Frame frame) {
        int streamId = frame.getStreamId();
        try {
            switch (frame.getType()) {
                case SETUP: {
                    return Px.error((Throwable)new IllegalStateException("Setup frame received post setup."));
                }
                case REQUEST_RESPONSE: {
                    return this.handleRequestResponse(streamId, this.requestResponse(frame));
                }
                case CANCEL: {
                    return this.handleCancelFrame(streamId);
                }
                case KEEPALIVE: {
                    return this.handleKeepAliveFrame(frame);
                }
                case REQUEST_N: {
                    return this.handleRequestN(streamId, frame);
                }
                case REQUEST_STREAM: {
                    return this.doReceive(streamId, this.requestStream(frame), EventListener.RequestType.RequestStream);
                }
                case FIRE_AND_FORGET: {
                    return this.handleFireAndForget(streamId, this.fireAndForget(frame));
                }
                case REQUEST_SUBSCRIPTION: {
                    return this.doReceive(streamId, this.requestSubscription(frame), EventListener.RequestType.RequestStream);
                }
                case REQUEST_CHANNEL: {
                    return this.handleChannel(streamId, frame);
                }
                case RESPONSE: {
                    return Px.empty();
                }
                case METADATA_PUSH: {
                    return this.metadataPush(frame);
                }
                case LEASE: {
                    return Px.empty();
                }
                case NEXT: {
                    RemoteReceiver receiver;
                    Int2ObjectHashMap<RemoteReceiver> int2ObjectHashMap = this.channelProcessors;
                    synchronized (int2ObjectHashMap) {
                        receiver = (RemoteReceiver)this.channelProcessors.get(streamId);
                    }
                    if (receiver != null) {
                        receiver.onNext(frame);
                    }
                    return Px.empty();
                }
                case COMPLETE: {
                    RemoteReceiver receiver;
                    Int2ObjectHashMap<RemoteReceiver> int2ObjectHashMap = this.channelProcessors;
                    synchronized (int2ObjectHashMap) {
                        receiver = (RemoteReceiver)this.channelProcessors.get(streamId);
                    }
                    if (receiver != null) {
                        receiver.onComplete();
                    }
                    return Px.empty();
                }
                case ERROR: {
                    RemoteReceiver receiver;
                    Int2ObjectHashMap<RemoteReceiver> int2ObjectHashMap = this.channelProcessors;
                    synchronized (int2ObjectHashMap) {
                        receiver = (RemoteReceiver)this.channelProcessors.get(streamId);
                    }
                    if (receiver != null) {
                        receiver.onError(new ApplicationException(frame));
                    }
                    return Px.empty();
                }
                case NEXT_COMPLETE: {
                    RemoteReceiver receiver;
                    Int2ObjectHashMap<RemoteReceiver> int2ObjectHashMap = this.channelProcessors;
                    synchronized (int2ObjectHashMap) {
                        receiver = (RemoteReceiver)this.channelProcessors.get(streamId);
                    }
                    if (receiver != null) {
                        receiver.onNext(frame);
                        receiver.onComplete();
                    }
                    return Px.empty();
                }
            }
            return this.handleError(streamId, new IllegalStateException("ServerReactiveSocket: Unexpected frame type: " + (Object)((Object)frame.getType())));
        }
        catch (Throwable t) {
            Px toReturn = this.handleError(streamId, t);
            if (t instanceof SetupException) {
                toReturn = Px.concatEmpty(toReturn, (Publisher)Px.error((Throwable)t));
            }
            return toReturn;
        }
    }

    private synchronized void removeChannelProcessor(int streamId) {
        this.channelProcessors.remove(streamId);
    }

    private synchronized void removeSubscriptions(int streamId) {
        this.subscriptions.remove(streamId);
    }

    private synchronized void cleanup() {
        this.subscriptions.values().forEach(Subscription::cancel);
        this.subscriptions.clear();
        this.channelProcessors.values().forEach(RemoteReceiver::cancel);
        this.subscriptions.clear();
        this.requestHandler.close().subscribe((Subscriber)Subscribers.empty());
    }

    private Publisher<Void> handleRequestResponse(int streamId, Publisher<Payload> response) {
        Runnable cleanup = () -> {
            ServerReactiveSocket serverReactiveSocket = this;
            synchronized (serverReactiveSocket) {
                this.subscriptions.remove(streamId);
            }
        };
        long now = this.publishSingleFrameReceiveEvents(streamId, EventListener.RequestType.RequestResponse);
        Px frames = Px.from(response).doOnSubscribe(subscription -> {
            ServerReactiveSocket serverReactiveSocket = this;
            synchronized (serverReactiveSocket) {
                this.subscriptions.put(streamId, subscription);
            }
        }).map(payload -> Frame.Response.from(streamId, FrameType.RESPONSE, payload.getMetadata(), payload.getData(), 4096)).doOnComplete(cleanup).emitOnCancelOrError(() -> {
            cleanup.run();
            return Frame.Cancel.from(streamId);
        }, throwable -> {
            cleanup.run();
            return Frame.Error.from(streamId, throwable);
        });
        return Px.from(this.eventPublishingSocket.decorateSend(streamId, this.connection.send((Publisher<Frame>)frames), now, EventListener.RequestType.RequestResponse));
    }

    private Publisher<Void> doReceive(int streamId, Publisher<Payload> response, EventListener.RequestType requestType) {
        long now = this.publishSingleFrameReceiveEvents(streamId, requestType);
        Px resp = Px.from(response).map(payload -> Frame.Response.from(streamId, FrameType.RESPONSE, payload));
        RemoteSender sender = new RemoteSender((Publisher<Frame>)resp, () -> {
            Subscription cfr_ignored_0 = (Subscription)this.subscriptions.remove(streamId);
        }, streamId, 2);
        this.subscriptions.put(streamId, (Object)sender);
        return this.eventPublishingSocket.decorateSend(streamId, this.connection.send((Publisher<Frame>)sender), now, requestType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Publisher<Void> handleChannel(int streamId, Frame firstFrame) {
        long now = this.publishSingleFrameReceiveEvents(streamId, EventListener.RequestType.RequestChannel);
        int initialRequestN = Frame.Request.initialRequestN(firstFrame);
        Frame firstAsNext = Frame.Request.from(streamId, FrameType.NEXT, firstFrame, initialRequestN);
        RemoteReceiver receiver = new RemoteReceiver(this.connection, streamId, () -> this.removeChannelProcessor(streamId), firstAsNext, this.receiversSubscription, true);
        this.channelProcessors.put(streamId, (Object)receiver);
        Px response = Px.from(this.requestChannel(this.eventPublishingSocket.decorateReceive(streamId, receiver, EventListener.RequestType.RequestChannel))).map(payload -> Frame.Response.from(streamId, FrameType.RESPONSE, payload));
        RemoteSender sender = new RemoteSender((Publisher<Frame>)response, () -> this.removeSubscriptions(streamId), streamId, initialRequestN);
        ServerReactiveSocket serverReactiveSocket = this;
        synchronized (serverReactiveSocket) {
            this.subscriptions.put(streamId, (Object)sender);
        }
        return this.eventPublishingSocket.decorateSend(streamId, this.connection.send((Publisher<Frame>)sender), now, EventListener.RequestType.RequestChannel);
    }

    private Publisher<Void> handleFireAndForget(int streamId, Publisher<Void> result) {
        return Px.from(result).doOnSubscribe(subscription -> this.addSubscription(streamId, (Subscription)subscription)).doOnError(t -> {
            this.removeSubscription(streamId);
            this.errorConsumer.accept((Throwable)t);
        }).doOnComplete(() -> this.removeSubscription(streamId));
    }

    private Publisher<Void> handleKeepAliveFrame(Frame frame) {
        if (Frame.Keepalive.hasRespondFlag(frame)) {
            return Px.from(this.connection.sendOne(Frame.Keepalive.from(Frame.NULL_BYTEBUFFER, false))).doOnError(this.errorConsumer);
        }
        return Px.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Publisher<Void> handleCancelFrame(int streamId) {
        Subscription subscription;
        ServerReactiveSocket serverReactiveSocket = this;
        synchronized (serverReactiveSocket) {
            subscription = (Subscription)this.subscriptions.remove(streamId);
        }
        if (subscription != null) {
            subscription.cancel();
        }
        return Px.empty();
    }

    private Publisher<Void> handleError(int streamId, Throwable t) {
        return Px.from(this.connection.sendOne(Frame.Error.from(streamId, t))).doOnError(this.errorConsumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Px<Void> handleRequestN(int streamId, Frame frame) {
        Subscription subscription;
        ServerReactiveSocket serverReactiveSocket = this;
        synchronized (serverReactiveSocket) {
            subscription = (Subscription)this.subscriptions.get(streamId);
        }
        if (subscription != null) {
            int n = Frame.RequestN.requestN(frame);
            subscription.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : (long)n);
        }
        return Px.empty();
    }

    private synchronized void addSubscription(int streamId, Subscription subscription) {
        this.subscriptions.put(streamId, (Object)subscription);
    }

    private synchronized void removeSubscription(int streamId) {
        this.subscriptions.remove(streamId);
    }

    private long publishSingleFrameReceiveEvents(int streamId, EventListener.RequestType requestType) {
        long now = Clock.now();
        if (this.eventPublisher.isEventPublishingEnabled()) {
            EventListener eventListener = this.eventPublisher.getEventListener();
            eventListener.requestReceiveStart(streamId, requestType);
            eventListener.requestReceiveComplete(streamId, requestType, Clock.elapsedSince(now), Clock.unit());
        }
        return now;
    }
}

