/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.lease;

import io.reactivesocket.Frame;
import io.reactivesocket.lease.DefaultLeaseEnforcingSocket;
import io.reactivesocket.lease.Lease;
import io.reactivesocket.lease.LeaseImpl;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.Cancellable;
import io.reactivesocket.reactivestreams.extensions.internal.CancellableImpl;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

public final class FairLeaseDistributor
implements DefaultLeaseEnforcingSocket.LeaseDistributor {
    private final LinkedBlockingQueue<Consumer<Lease>> activeRecipients;
    private Subscription ticksSubscription;
    private volatile boolean startTicks;
    private final IntSupplier capacitySupplier;
    private final int leaseTTLMillis;
    private final Publisher<Long> leaseDistributionTicks;
    private final boolean redistributeOnConnect;

    public FairLeaseDistributor(IntSupplier capacitySupplier, int leaseTTLMillis, Publisher<Long> leaseDistributionTicks, boolean redistributeOnConnect) {
        this.capacitySupplier = capacitySupplier;
        this.leaseTTLMillis = (int)((double)leaseTTLMillis * 1.1);
        this.leaseDistributionTicks = leaseDistributionTicks;
        this.redistributeOnConnect = redistributeOnConnect;
        this.activeRecipients = new LinkedBlockingQueue();
    }

    public FairLeaseDistributor(IntSupplier capacitySupplier, int leaseTTLMillis, Publisher<Long> leaseDistributionTicks) {
        this(capacitySupplier, leaseTTLMillis, leaseDistributionTicks, true);
    }

    @Override
    public void shutdown() {
        this.ticksSubscription.cancel();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Cancellable registerSocket(final Consumer<Lease> leaseConsumer) {
        boolean _started;
        this.activeRecipients.add(leaseConsumer);
        FairLeaseDistributor fairLeaseDistributor = this;
        synchronized (fairLeaseDistributor) {
            _started = this.startTicks;
            if (!this.startTicks) {
                this.startTicks();
                this.startTicks = true;
            }
        }
        if (_started && this.redistributeOnConnect) {
            this.distribute(this.capacitySupplier.getAsInt());
        }
        return new CancellableImpl(){

            protected void onCancel() {
                FairLeaseDistributor.this.activeRecipients.remove(leaseConsumer);
            }
        };
    }

    private void distribute(int permits) {
        if (this.activeRecipients.isEmpty()) {
            return;
        }
        int recipients = this.activeRecipients.size();
        int budget = permits / recipients;
        int extra = permits - budget * recipients;
        LeaseImpl budgetLease = new LeaseImpl(budget, this.leaseTTLMillis, Frame.NULL_BYTEBUFFER);
        for (Consumer<Lease> recipient : this.activeRecipients) {
            LeaseImpl leaseToSend = budgetLease;
            int n = budget;
            if (extra > 0) {
                --extra;
                leaseToSend = new LeaseImpl(++n, this.leaseTTLMillis, Frame.NULL_BYTEBUFFER);
            }
            recipient.accept(leaseToSend);
        }
    }

    private void startTicks() {
        Px.from(this.leaseDistributionTicks).doOnSubscribe(subscription -> {
            this.ticksSubscription = subscription;
        }).doOnNext(aLong -> this.distribute(this.capacitySupplier.getAsInt())).ignore().subscribe();
    }
}

