/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.lease;

import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.exceptions.RejectedException;
import io.reactivesocket.lease.DefaultLeaseHonoringSocket;
import io.reactivesocket.lease.Lease;
import io.reactivesocket.lease.LeaseEnforcingSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.Cancellable;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class DefaultLeaseEnforcingSocket
extends DefaultLeaseHonoringSocket
implements LeaseEnforcingSocket {
    private final LeaseDistributor leaseDistributor;
    private volatile Consumer<Lease> leaseSender;
    private Cancellable distributorCancellation;
    private final Px rejectError;

    public DefaultLeaseEnforcingSocket(ReactiveSocket delegate, LeaseDistributor leaseDistributor, LongSupplier currentTimeSupplier, boolean clientHonorsLeases) {
        super(delegate, currentTimeSupplier);
        this.leaseDistributor = leaseDistributor;
        this.rejectError = !clientHonorsLeases ? Px.error((Throwable)new RejectedException("Server overloaded.")) : null;
    }

    public DefaultLeaseEnforcingSocket(ReactiveSocket delegate, LeaseDistributor leaseDistributor, LongSupplier currentTimeSupplier) {
        this(delegate, leaseDistributor, currentTimeSupplier, true);
    }

    public DefaultLeaseEnforcingSocket(ReactiveSocket delegate, LeaseDistributor leaseDistributor) {
        this(delegate, leaseDistributor, System::currentTimeMillis);
    }

    @Override
    public void acceptLeaseSender(Consumer<Lease> leaseSender) {
        this.leaseSender = leaseSender;
        this.distributorCancellation = this.leaseDistributor.registerSocket(lease -> this.accept((Lease)lease));
        this.onClose().subscribe((Subscriber)Subscribers.doOnTerminate(() -> this.distributorCancellation.cancel()));
    }

    @Override
    public void accept(Lease lease) {
        this.leaseSender.accept(lease);
        super.accept(lease);
    }

    public LeaseDistributor getLeaseDistributor() {
        return this.leaseDistributor;
    }

    @Override
    public Publisher<Void> close() {
        return Px.from(super.close()).doOnSubscribe(subscription -> this.leaseDistributor.shutdown());
    }

    @Override
    protected <T> Publisher<T> rejectError() {
        return null == this.rejectError ? super.rejectError() : this.rejectError;
    }

    public static interface LeaseDistributor {
        public void shutdown();

        public Cancellable registerSocket(Consumer<Lease> var1);
    }
}

