/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.internal;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription;
import org.agrona.BitUtil;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ClientServerInputMultiplexer {
    private final SourceInput sourceInput;

    public ClientServerInputMultiplexer(DuplexConnection source) {
        this.sourceInput = new SourceInput(source);
    }

    public Publisher<Frame> getServerInput() {
        return this.sourceInput.evenStream();
    }

    public Publisher<Frame> getClientInput() {
        return this.sourceInput.oddStream();
    }

    public DuplexConnection asServerConnection() {
        return new InternalDuplexConnection(this.getServerInput());
    }

    public DuplexConnection asClientConnection() {
        return new InternalDuplexConnection(this.getClientInput());
    }

    private class InternalDuplexConnection
    implements DuplexConnection {
        private final Publisher<Frame> input;

        public InternalDuplexConnection(Publisher<Frame> input) {
            this.input = input;
        }

        @Override
        public Publisher<Void> send(Publisher<Frame> frame) {
            return ClientServerInputMultiplexer.this.sourceInput.source.send(frame);
        }

        @Override
        public Publisher<Frame> receive() {
            return this.input;
        }

        @Override
        public double availability() {
            return ClientServerInputMultiplexer.this.sourceInput.source.availability();
        }

        @Override
        public Publisher<Void> close() {
            return ClientServerInputMultiplexer.this.sourceInput.source.close();
        }

        @Override
        public Publisher<Void> onClose() {
            return ClientServerInputMultiplexer.this.sourceInput.source.onClose();
        }
    }

    private static final class SourceInput
    implements Subscriber<Frame> {
        private final DuplexConnection source;
        private int subscriberCount;
        private volatile Subscription sourceSubscription;
        private volatile ValidatingSubscription<? super Frame> oddSubscription;
        private volatile ValidatingSubscription<? super Frame> evenSubscription;

        public SourceInput(DuplexConnection source) {
            this.source = source;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onSubscribe(Subscription s) {
            boolean cancelThis;
            SourceInput sourceInput = this;
            synchronized (sourceInput) {
                cancelThis = this.sourceSubscription != null;
                this.sourceSubscription = s;
            }
            if (cancelThis) {
                s.cancel();
            } else {
                this.oddSubscription.getSubscriber().onSubscribe(this.oddSubscription);
                this.evenSubscription.getSubscriber().onSubscribe(this.evenSubscription);
            }
        }

        public void onNext(Frame frame) {
            if (frame.getStreamId() == 0) {
                this.evenSubscription.safeOnNext((Object)frame);
                this.oddSubscription.safeOnNext((Object)frame);
            } else if (BitUtil.isEven((int)frame.getStreamId())) {
                this.evenSubscription.safeOnNext((Object)frame);
            } else {
                this.oddSubscription.safeOnNext((Object)frame);
            }
        }

        public void onError(Throwable t) {
            this.oddSubscription.safeOnError(t);
            this.evenSubscription.safeOnError(t);
        }

        public void onComplete() {
            this.oddSubscription.safeOnComplete();
            this.evenSubscription.safeOnComplete();
        }

        public Publisher<Frame> oddStream() {
            return s -> this.subscribe((Subscriber<? super Frame>)s, true);
        }

        public Publisher<Frame> evenStream() {
            return s -> this.subscribe((Subscriber<? super Frame>)s, false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void subscribe(Subscriber<? super Frame> s, boolean odd) {
            IllegalStateException sendError = null;
            boolean subscribeUp = false;
            SourceInput sourceInput = this;
            synchronized (sourceInput) {
                if (this.subscriberCount == 0 || this.subscriberCount == 1) {
                    if (odd) {
                        if (this.oddSubscription == null) {
                            this.oddSubscription = this.newSubscription(s);
                        } else {
                            sendError = new IllegalStateException("An active subscription already exists.");
                        }
                    } else if (this.evenSubscription == null) {
                        this.evenSubscription = this.newSubscription(s);
                    } else {
                        sendError = new IllegalStateException("An active subscription already exists.");
                    }
                    ++this.subscriberCount;
                    subscribeUp = this.subscriberCount == 2;
                } else {
                    sendError = new IllegalStateException("More than 2 subscribers received.");
                }
            }
            if (sendError != null) {
                s.onError((Throwable)sendError);
            } else if (subscribeUp) {
                this.source.receive().subscribe((Subscriber)this);
            }
        }

        private ValidatingSubscription<? super Frame> newSubscription(Subscriber<? super Frame> s) {
            return ValidatingSubscription.create(s, () -> {
                boolean cancelUp;
                SourceInput sourceInput = this;
                synchronized (sourceInput) {
                    cancelUp = --this.subscriberCount == 0;
                }
                if (cancelUp) {
                    this.sourceSubscription.cancel();
                }
            }, requestN -> this.sourceSubscription.request(requestN));
        }
    }
}

