/*
 * Decompiled with CFR 0.152.
 */
package kr.jm.utils.flow.publisher;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import kr.jm.utils.flow.publisher.JMListSubmissionPublisher;
import kr.jm.utils.helper.JMLog;
import kr.jm.utils.helper.JMOptional;
import kr.jm.utils.helper.JMThread;
import org.slf4j.Logger;

public class BulkSubmissionPublisher<T>
extends JMListSubmissionPublisher<T>
implements AutoCloseable {
    public static final int DEFAULT_BULK_SIZE = 100;
    public static final int DEFAULT_FLUSH_INTERVAL_SECONDS = 1;
    protected int bulkSize;
    protected long flushIntervalMillis;
    protected List<T> dataList;
    protected long lastDataTimestamp;
    private ScheduledFuture<?> scheduledFuture;

    public BulkSubmissionPublisher() {
        this(100);
    }

    public BulkSubmissionPublisher(int bulkSize) {
        this(bulkSize, 1);
    }

    public BulkSubmissionPublisher(int bulkSize, int flushIntervalSeconds) {
        this.bulkSize = bulkSize;
        this.flushIntervalMillis = flushIntervalSeconds * 1000;
        this.dataList = new ArrayList<T>();
        this.lastDataTimestamp = Long.MAX_VALUE;
        this.scheduledFuture = JMThread.runWithScheduleAtFixedRate((long)this.flushIntervalMillis, (long)this.flushIntervalMillis, this::checkIntervalAndFlush);
    }

    private void checkIntervalAndFlush() {
        if (this.lastDataTimestamp < System.currentTimeMillis() - this.flushIntervalMillis && this.dataList.size() > 0) {
            JMLog.warn((Logger)this.log, (String)"checkIntervalAndFlush", (Object[])new Object[]{this.lastDataTimestamp, this.flushIntervalMillis});
            this.flush();
        }
    }

    @Override
    public int submit(T[] dataArray) {
        return this.submit(Optional.ofNullable(dataArray).map(Arrays::asList).orElseGet(Collections::emptyList));
    }

    @Override
    public int submit(List<T> itemList) {
        return JMOptional.getOptional(itemList).map(this::submitBulk).orElse(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int submitBulk(List<T> dataList) {
        List<T> list = this.dataList;
        synchronized (list) {
            if (this.dataList.size() + dataList.size() < this.bulkSize) {
                this.dataList.addAll(dataList);
                this.setLastDataTimestamp();
            } else {
                for (T data : dataList) {
                    this.submitSingle(data);
                }
            }
            return dataList.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int submitSingle(T item) {
        if (Objects.isNull(item)) {
            return 0;
        }
        List<T> list = this.dataList;
        synchronized (list) {
            this.dataList.add(item);
            this.setLastDataTimestamp();
            if (this.dataList.size() >= this.bulkSize) {
                this.flush();
            }
            return 1;
        }
    }

    private void setLastDataTimestamp() {
        this.lastDataTimestamp = System.currentTimeMillis();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush() {
        JMLog.debug((Logger)this.log, (String)"flush", (Object[])new Object[]{this.dataList.size()});
        List<T> list = this.dataList;
        synchronized (list) {
            if (this.dataList.size() > 0) {
                super.submit(this.dataList);
                this.dataList = new ArrayList<T>();
            }
        }
    }

    @Override
    public void close() {
        JMLog.info((Logger)this.log, (String)"close");
        this.scheduledFuture.cancel(false);
    }

    @Override
    public BulkSubmissionPublisher<T> subscribeWith(Flow.Subscriber<List<T>> ... subscribers) {
        super.subscribeWith(subscribers);
        return this;
    }

    @Override
    public BulkSubmissionPublisher<T> consumeWith(Consumer<List<T>> ... consumers) {
        super.consumeWith(consumers);
        return this;
    }
}

