/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.FrameType;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.StreamIdSupplier;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.events.EventListener;
import io.reactivesocket.events.EventPublishingSocket;
import io.reactivesocket.events.EventPublishingSocketImpl;
import io.reactivesocket.exceptions.CancelException;
import io.reactivesocket.exceptions.Exceptions;
import io.reactivesocket.internal.DisabledEventPublisher;
import io.reactivesocket.internal.EventPublisher;
import io.reactivesocket.internal.KnownErrorFilter;
import io.reactivesocket.internal.RemoteReceiver;
import io.reactivesocket.internal.RemoteSender;
import io.reactivesocket.lease.Lease;
import io.reactivesocket.lease.LeaseImpl;
import io.reactivesocket.reactivestreams.extensions.DefaultSubscriber;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.FlowControlHelper;
import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.CancellableSubscriber;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import org.agrona.collections.Int2ObjectHashMap;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ClientReactiveSocket
implements ReactiveSocket {
    private final DuplexConnection connection;
    private final Consumer<Throwable> errorConsumer;
    private final StreamIdSupplier streamIdSupplier;
    private final KeepAliveProvider keepAliveProvider;
    private final EventPublishingSocket eventPublishingSocket;
    private final Int2ObjectHashMap<Subscription> senders;
    private final Int2ObjectHashMap<Subscriber<Frame>> receivers;
    private final BufferingSubscription transportReceiveSubscription = new BufferingSubscription();
    private CancellableSubscriber<Void> keepAliveSendSub;
    private volatile Consumer<Lease> leaseConsumer;

    public ClientReactiveSocket(DuplexConnection connection, Consumer<Throwable> errorConsumer, StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider, EventPublisher<? extends EventListener> publisher) {
        this.connection = connection;
        this.errorConsumer = new KnownErrorFilter(errorConsumer);
        this.streamIdSupplier = streamIdSupplier;
        this.keepAliveProvider = keepAliveProvider;
        this.eventPublishingSocket = publisher.isEventPublishingEnabled() ? new EventPublishingSocketImpl(publisher, true) : EventPublishingSocket.DISABLED;
        this.senders = new Int2ObjectHashMap(256, 0.9f);
        this.receivers = new Int2ObjectHashMap(256, 0.9f);
        connection.onClose().subscribe((Subscriber)Subscribers.cleanup(() -> this.cleanup()));
    }

    public ClientReactiveSocket(DuplexConnection connection, Consumer<Throwable> errorConsumer, StreamIdSupplier streamIdSupplier, KeepAliveProvider keepAliveProvider) {
        this(connection, errorConsumer, streamIdSupplier, keepAliveProvider, new DisabledEventPublisher());
    }

    @Override
    public Publisher<Void> fireAndForget(Payload payload) {
        return Px.defer(() -> {
            int streamId = this.nextStreamId();
            Frame requestFrame = Frame.Request.from(streamId, FrameType.FIRE_AND_FORGET, payload, 0);
            return this.connection.sendOne(requestFrame);
        });
    }

    @Override
    public Publisher<Payload> requestResponse(Payload payload) {
        return this.handleRequestResponse(payload);
    }

    @Override
    public Publisher<Payload> requestStream(Payload payload) {
        return this.handleStreamResponse((Px<Payload>)Px.just((Object)payload), FrameType.REQUEST_STREAM);
    }

    @Override
    public Publisher<Payload> requestSubscription(Payload payload) {
        return this.handleStreamResponse((Px<Payload>)Px.just((Object)payload), FrameType.REQUEST_SUBSCRIPTION);
    }

    @Override
    public Publisher<Payload> requestChannel(Publisher<Payload> payloads) {
        return this.handleStreamResponse((Px<Payload>)Px.from(payloads), FrameType.REQUEST_CHANNEL);
    }

    @Override
    public Publisher<Void> metadataPush(Payload payload) {
        Frame requestFrame = Frame.Request.from(0, FrameType.METADATA_PUSH, payload, 0);
        return this.connection.sendOne(requestFrame);
    }

    @Override
    public double availability() {
        return this.connection.availability();
    }

    @Override
    public Publisher<Void> close() {
        return Px.concatEmpty((Publisher)Px.defer(() -> {
            this.cleanup();
            return Px.empty();
        }), this.connection.close());
    }

    @Override
    public Publisher<Void> onClose() {
        return this.connection.onClose();
    }

    public ClientReactiveSocket start(Consumer<Lease> leaseConsumer) {
        this.leaseConsumer = leaseConsumer;
        this.startKeepAlive();
        this.startReceivingRequests();
        return this;
    }

    private Publisher<Payload> handleRequestResponse(Payload payload) {
        return Px.create(subscriber -> {
            int streamId = this.nextStreamId();
            Frame requestFrame = Frame.Request.from(streamId, FrameType.REQUEST_RESPONSE, payload, 1);
            ClientReactiveSocket clientReactiveSocket = this;
            synchronized (clientReactiveSocket) {
                Subscriber raw;
                Subscriber fs = raw = subscriber;
                this.receivers.put(streamId, (Object)fs);
            }
            Publisher<Void> send = this.eventPublishingSocket.decorateSend(streamId, this.connection.sendOne(requestFrame), 0L, EventListener.RequestType.RequestResponse);
            this.eventPublishingSocket.decorateReceive(streamId, Px.concatEmpty(send, (Publisher)Px.never()).cast(Payload.class).doOnCancel(() -> {
                if (this.connection.availability() > 0.0) {
                    this.connection.sendOne(Frame.Cancel.from(streamId)).subscribe(DefaultSubscriber.defaultInstance());
                }
                this.removeReceiver(streamId);
            }), EventListener.RequestType.RequestResponse).subscribe(subscriber);
        });
    }

    private Publisher<Payload> handleStreamResponse(Px<Payload> request, FrameType requestType) {
        return Px.defer(() -> {
            int streamId = this.nextStreamId();
            RemoteSender sender = new RemoteSender((Publisher<Frame>)request.map(payload -> Frame.Request.from(streamId, requestType, payload, 1)), this.removeSenderLambda(streamId), streamId, 1);
            Publisher src = s -> {
                CancellableSubscriber sendSub = Subscribers.doOnError(throwable -> s.onError(throwable));
                ValidatingSubscription sub = ValidatingSubscription.create((Subscriber)s, () -> sendSub.cancel(), requestN -> this.transportReceiveSubscription.request(requestN));
                this.eventPublishingSocket.decorateSend(streamId, this.connection.send((Publisher<Frame>)sender), 0L, EventListener.RequestType.fromFrameType(requestType)).subscribe((Subscriber)sendSub);
                s.onSubscribe((Subscription)sub);
            };
            RemoteReceiver receiver = new RemoteReceiver((Publisher<Frame>)src, this.connection, streamId, this.removeReceiverLambda(streamId), true);
            this.registerSenderReceiver(streamId, sender, (Subscriber<Frame>)receiver);
            return this.eventPublishingSocket.decorateReceive(streamId, receiver, EventListener.RequestType.fromFrameType(requestType));
        });
    }

    private void startKeepAlive() {
        this.keepAliveSendSub = Subscribers.doOnError(this.errorConsumer);
        this.connection.send((Publisher<Frame>)Px.from(this.keepAliveProvider.ticks()).map(i -> Frame.Keepalive.from(Frame.NULL_BYTEBUFFER, true))).subscribe(this.keepAliveSendSub);
    }

    private void startReceivingRequests() {
        Px.from(this.connection.receive()).doOnSubscribe(subscription -> this.transportReceiveSubscription.switchTo(subscription)).doOnNext(this::handleIncomingFrames).subscribe();
    }

    protected void cleanup() {
        if (null != this.keepAliveSendSub) {
            this.keepAliveSendSub.cancel();
        }
        this.transportReceiveSubscription.cancel();
    }

    private void handleIncomingFrames(Frame frame) {
        int streamId = frame.getStreamId();
        FrameType type = frame.getType();
        if (streamId == 0) {
            this.handleStreamZero(type, frame);
        } else {
            this.handleFrame(streamId, type, frame);
        }
    }

    private void handleStreamZero(FrameType type, Frame frame) {
        switch (type) {
            case ERROR: {
                throw Exceptions.from(frame);
            }
            case LEASE: {
                if (this.leaseConsumer == null) break;
                this.leaseConsumer.accept(new LeaseImpl(frame));
                break;
            }
            case KEEPALIVE: {
                if (Frame.Keepalive.hasRespondFlag(frame)) break;
                this.keepAliveProvider.ack();
                break;
            }
            default: {
                this.errorConsumer.accept(new IllegalStateException("Client received supported frame on stream 0: " + frame.toString()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleFrame(int streamId, FrameType type, Frame frame) {
        Subscriber receiver;
        ClientReactiveSocket clientReactiveSocket = this;
        synchronized (clientReactiveSocket) {
            receiver = (Subscriber)this.receivers.get(streamId);
        }
        if (receiver == null) {
            this.handleMissingResponseProcessor(streamId, type, frame);
        } else {
            switch (type) {
                case ERROR: {
                    receiver.onError((Throwable)Exceptions.from(frame));
                    clientReactiveSocket = this;
                    synchronized (clientReactiveSocket) {
                        this.receivers.remove(streamId);
                        break;
                    }
                }
                case NEXT_COMPLETE: {
                    receiver.onNext((Object)frame);
                    receiver.onComplete();
                    clientReactiveSocket = this;
                    synchronized (clientReactiveSocket) {
                        this.receivers.remove(streamId);
                        break;
                    }
                }
                case CANCEL: {
                    Subscription sender;
                    ClientReactiveSocket clientReactiveSocket2 = this;
                    synchronized (clientReactiveSocket2) {
                        sender = (Subscription)this.senders.remove(streamId);
                        this.receivers.remove(streamId);
                    }
                    if (sender != null) {
                        sender.cancel();
                    }
                    receiver.onError((Throwable)new CancelException("cancelling stream id " + streamId));
                    break;
                }
                case NEXT: {
                    receiver.onNext((Object)frame);
                    break;
                }
                case REQUEST_N: {
                    Subscription sender;
                    ClientReactiveSocket clientReactiveSocket3 = this;
                    synchronized (clientReactiveSocket3) {
                        sender = (Subscription)this.senders.get(streamId);
                    }
                    if (sender == null) break;
                    int n = Frame.RequestN.requestN(frame);
                    sender.request((long)n);
                    break;
                }
                case COMPLETE: {
                    receiver.onComplete();
                    clientReactiveSocket = this;
                    synchronized (clientReactiveSocket) {
                        this.receivers.remove(streamId);
                        break;
                    }
                }
                default: {
                    throw new IllegalStateException("Client received supported frame on stream " + streamId + ": " + frame.toString());
                }
            }
        }
    }

    private void handleMissingResponseProcessor(int streamId, FrameType type, Frame frame) {
        if (!this.streamIdSupplier.isBeforeOrCurrent(streamId)) {
            if (type == FrameType.ERROR) {
                String errorMessage = ClientReactiveSocket.getByteBufferAsString(frame.getData());
                throw new IllegalStateException("Client received error for non-existent stream: " + streamId + " Message: " + errorMessage);
            }
            throw new IllegalStateException("Client received message for non-existent stream: " + streamId + ", frame type: " + (Object)((Object)type));
        }
    }

    private int nextStreamId() {
        return this.streamIdSupplier.nextStreamId();
    }

    private static String getByteBufferAsString(ByteBuffer bb) {
        byte[] bytes = new byte[bb.remaining()];
        bb.get(bytes);
        return new String(bytes, StandardCharsets.UTF_8);
    }

    private Runnable removeReceiverLambda(int streamId) {
        return () -> this.removeReceiver(streamId);
    }

    private synchronized void removeReceiver(int streamId) {
        this.receivers.remove(streamId);
    }

    private Runnable removeSenderLambda(int streamId) {
        return () -> this.removeSender(streamId);
    }

    private synchronized void removeSender(int streamId) {
        this.senders.remove(streamId);
    }

    private synchronized void registerSenderReceiver(int streamId, Subscription sender, Subscriber<Frame> receiver) {
        this.senders.put(streamId, (Object)sender);
        this.receivers.put(streamId, receiver);
    }

    private static class BufferingSubscription
    implements Subscription {
        private int requested;
        private boolean cancelled;
        private Subscription delegate;

        private BufferingSubscription() {
        }

        public void request(long n) {
            if (this.relay()) {
                this.delegate.request(n);
            } else {
                this.requested = FlowControlHelper.incrementRequestN((int)this.requested, (long)n);
            }
        }

        public void cancel() {
            if (this.relay()) {
                this.delegate.cancel();
            } else {
                this.cancelled = true;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void switchTo(Subscription subscription) {
            BufferingSubscription bufferingSubscription = this;
            synchronized (bufferingSubscription) {
                this.delegate = subscription;
            }
            if (this.requested > 0) {
                subscription.request((long)this.requested);
            }
            if (this.cancelled) {
                subscription.cancel();
            }
        }

        private synchronized boolean relay() {
            return this.delegate != null;
        }
    }
}

