/*
 * Decompiled with CFR 0.152.
 */
package kr.jm.utils.flow.publisher;

import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import kr.jm.utils.enums.OS;
import kr.jm.utils.exception.JMExceptionManager;
import kr.jm.utils.flow.publisher.SubmissionPublisherImplementsJM;
import kr.jm.utils.helper.JMThread;
import org.slf4j.Logger;

public class WaitingSubmissionPublisher<T>
extends SubmissionPublisherImplementsJM<T> {
    private int queueSizeLimit;

    public WaitingSubmissionPublisher() {
        this(WaitingSubmissionPublisher.getDefaultQueueSizeLimit());
    }

    public WaitingSubmissionPublisher(int queueSizeLimit) {
        this(null, queueSizeLimit);
    }

    public WaitingSubmissionPublisher(Executor executor) {
        this(executor, WaitingSubmissionPublisher.getDefaultQueueSizeLimit());
    }

    public static int getDefaultQueueSizeLimit() {
        return OS.getAvailableProcessors() * 8;
    }

    public WaitingSubmissionPublisher(Executor executor, int queueSizeLimit) {
        super(Objects.isNull(executor) ? JMThread.newSingleThreadPool() : executor, queueSizeLimit);
        this.queueSizeLimit = queueSizeLimit;
    }

    private boolean checkSuspendCondition(int lag, int waitCount) {
        if (this.isClosed() || lag < this.queueSizeLimit) {
            return false;
        }
        this.logWaiting(lag, waitCount);
        return true;
    }

    private boolean checkSuspendCondition(AtomicInteger waitCount) {
        return this.checkSuspendCondition(this.estimateMaximumLag(), waitCount.incrementAndGet());
    }

    private void logWaiting(int lag, int waitCount) {
        if (100 % waitCount == 0 || waitCount % 600 == 0) {
            this.log.warn("Wait Occur !!! - queueSizeLimit = {}, lag = {}", (Object)this.queueSizeLimit, (Object)lag);
        }
    }

    @Override
    public int submit(T data) {
        if (this.isClosed() || Objects.isNull(data)) {
            return 0;
        }
        AtomicInteger waitCount = new AtomicInteger();
        JMThread.suspend((long)100L, () -> this.checkSuspendCondition(waitCount));
        try {
            return super.submit(data);
        }
        catch (Exception e) {
            return (Integer)JMExceptionManager.handleExceptionAndReturn((Logger)this.log, (Throwable)e, (String)"submit", () -> 0, (Object[])new Object[]{data});
        }
    }

    public int getSizeLimit() {
        return this.queueSizeLimit;
    }
}

