/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.client;

import io.reactivesocket.exceptions.ConnectionException;
import io.reactivesocket.reactivestreams.extensions.Px;
import java.util.function.LongSupplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public final class KeepAliveProvider {
    private volatile boolean ackThresholdBreached;
    private volatile long lastKeepAliveMillis;
    private volatile long lastAckMillis;
    private final Publisher<Long> ticks = s -> ticks.subscribe((Subscriber)new Subscriber<Long>(){
        private Subscription subscription;

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            s.onSubscribe(subscription);
        }

        public void onNext(Long aLong) {
            KeepAliveProvider.this.updateAckBreachThreshold();
            if (KeepAliveProvider.this.ackThresholdBreached) {
                this.onError(new ConnectionException("Missing keep alive from the peer."));
                this.subscription.cancel();
            } else {
                KeepAliveProvider.this.lastKeepAliveMillis = currentTimeSupplier.getAsLong();
                s.onNext((Object)aLong);
            }
        }

        public void onError(Throwable t) {
            s.onError(t);
        }

        public void onComplete() {
            s.onComplete();
        }
    });
    private final int keepAlivePeriodMillis;
    private final int missedKeepAliveThreshold;
    private final LongSupplier currentTimeSupplier;

    private KeepAliveProvider(Publisher<Long> ticks, int keepAlivePeriodMillis, int missedKeepAliveThreshold, final LongSupplier currentTimeSupplier) {
        this.keepAlivePeriodMillis = keepAlivePeriodMillis;
        this.missedKeepAliveThreshold = missedKeepAliveThreshold;
        this.currentTimeSupplier = currentTimeSupplier;
    }

    public Publisher<Long> ticks() {
        return this.ticks;
    }

    public void ack() {
        this.lastAckMillis = this.currentTimeSupplier.getAsLong();
        this.updateAckBreachThreshold();
    }

    public int getKeepAlivePeriodMillis() {
        return this.keepAlivePeriodMillis;
    }

    public int getMissedKeepAliveThreshold() {
        return this.missedKeepAliveThreshold;
    }

    public static KeepAliveProvider never() {
        return KeepAliveProvider.from(Integer.MAX_VALUE, (Publisher<Long>)Px.never());
    }

    public static KeepAliveProvider from(int keepAlivePeriodMillis, Publisher<Long> keepAliveTicks) {
        return KeepAliveProvider.from(keepAlivePeriodMillis, 3, keepAliveTicks);
    }

    public static KeepAliveProvider from(int keepAlivePeriodMillis, int missedKeepAliveThreshold, Publisher<Long> keepAliveTicks) {
        return KeepAliveProvider.from(keepAlivePeriodMillis, missedKeepAliveThreshold, keepAliveTicks, System::currentTimeMillis);
    }

    public static KeepAliveProvider from(int keepAlivePeriodMillis, int missedKeepAliveThreshold, Publisher<Long> keepAliveTicks, LongSupplier currentTimeSupplier) {
        return new KeepAliveProvider(keepAliveTicks, keepAlivePeriodMillis, missedKeepAliveThreshold, currentTimeSupplier);
    }

    private void updateAckBreachThreshold() {
        long missedAcks = (this.lastAckMillis - this.lastKeepAliveMillis) / (long)this.keepAlivePeriodMillis;
        if (missedAcks < 0L || missedAcks > (long)this.missedKeepAliveThreshold) {
            this.ackThresholdBreached = true;
        }
    }
}

