/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.virtualnode.writebehind.base;

import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.storage.Datarouter;
import io.datarouter.storage.node.op.NodeOps;
import io.datarouter.util.collection.CollectionTool;
import io.datarouter.util.collection.ListTool;
import io.datarouter.util.concurrent.FutureTool;
import io.datarouter.virtualnode.writebehind.WriteBehindNode;
import io.datarouter.virtualnode.writebehind.base.OutstandingWriteWrapper;
import io.datarouter.virtualnode.writebehind.base.OverdueWriteCanceller;
import io.datarouter.virtualnode.writebehind.base.WriteWrapper;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseWriteBehindNode<PK extends PrimaryKey<PK>, D extends Databean<PK, D>, N extends NodeOps<PK, D>>
implements WriteBehindNode<PK, D, N> {
    private static final Logger logger = LoggerFactory.getLogger(BaseWriteBehindNode.class);
    public static final int FLUSH_RATE_MS = 500;
    private static final int FLUSH_BATCH_SIZE = 100;
    private static final long DEFAULT_TIMEOUT_MS = Duration.ofMinutes(1L).toMillis();
    protected final N backingNode;
    protected final long timeoutMs;
    protected final Queue<OutstandingWriteWrapper> outstandingWrites;
    private final ExecutorService writeExecutor;
    private final Queue<WriteWrapper<?>> queue;
    private final QueueFlusher queueFlusher;

    public BaseWriteBehindNode(Datarouter datarouter, N backingNode) {
        Objects.requireNonNull(backingNode, "backingNode cannot be null.");
        this.backingNode = backingNode;
        this.writeExecutor = datarouter.getWriteBehindExecutor();
        this.timeoutMs = DEFAULT_TIMEOUT_MS;
        this.outstandingWrites = new ConcurrentLinkedQueue<OutstandingWriteWrapper>();
        ScheduledExecutorService scheduler = datarouter.getWriteBehindScheduler();
        scheduler.scheduleWithFixedDelay(new OverdueWriteCanceller(this), 0L, 1000L, TimeUnit.MILLISECONDS);
        this.queueFlusher = new QueueFlusher();
        scheduler.scheduleWithFixedDelay(this.queueFlusher, 500L, 500L, TimeUnit.MILLISECONDS);
        this.queue = new LinkedBlockingQueue();
    }

    @Override
    public Queue<WriteWrapper<?>> getQueue() {
        return this.queue;
    }

    @Override
    public N getBackingNode() {
        return this.backingNode;
    }

    public void flush() {
        this.queueFlusher.flushQueue().stream().filter(Objects::nonNull).forEach(FutureTool::get);
    }

    static /* synthetic */ Logger access$0() {
        return logger;
    }

    private class QueueFlusher
    implements Runnable {
        private WriteWrapper<Object> previousWriteWrapper;

        private QueueFlusher() {
        }

        @Override
        public void run() {
            try {
                List<Future<?>> futures = this.flushQueue();
                logger.info("Futures submited count={} node={}", (Object)futures.size(), BaseWriteBehindNode.this.backingNode);
            }
            catch (Throwable t) {
                logger.error("Failed to flush queue for {}", BaseWriteBehindNode.this.backingNode, (Object)t);
            }
        }

        private synchronized List<Future<?>> flushQueue() {
            ArrayList futures = new ArrayList();
            this.previousWriteWrapper = null;
            while (!BaseWriteBehindNode.this.queue.isEmpty()) {
                WriteWrapper<?> writeWrapper = BaseWriteBehindNode.this.queue.poll();
                if (!(this.previousWriteWrapper == null || writeWrapper.getOp().equals(this.previousWriteWrapper.getOp()) && writeWrapper.getConfig() == null)) {
                    futures.add(this.handlePrevious());
                }
                if (writeWrapper.getConfig() != null) {
                    futures.add(this.handleWriteWrapper(writeWrapper));
                    continue;
                }
                List list = ListTool.asList(writeWrapper.getObjects());
                if (this.previousWriteWrapper == null) {
                    this.previousWriteWrapper = new WriteWrapper(writeWrapper.getOp(), Collections.emptyList(), null);
                }
                int previousSize = this.previousWriteWrapper.getObjects().size();
                int end = Math.min(100 - previousSize, list.size());
                this.previousWriteWrapper.getObjects().addAll(list.subList(0, end));
                if (this.previousWriteWrapper.getObjects().size() == 100) {
                    futures.add(this.handlePrevious());
                }
                int counter = 1;
                while (counter * 100 - previousSize < list.size()) {
                    int beginning = counter * 100 - previousSize;
                    end = Math.min(++counter * 100 - previousSize, list.size());
                    if (this.previousWriteWrapper == null) {
                        this.previousWriteWrapper = new WriteWrapper(writeWrapper.getOp(), Collections.emptyList(), null);
                    }
                    this.previousWriteWrapper.getObjects().addAll(list.subList(beginning, end));
                    if (this.previousWriteWrapper.getObjects().size() != 100) continue;
                    futures.add(this.handlePrevious());
                }
            }
            if (this.previousWriteWrapper != null) {
                futures.add(this.handlePrevious());
            }
            return futures;
        }

        private Future<?> handlePrevious() {
            Future<?> future = this.handleWriteWrapper(this.previousWriteWrapper);
            this.previousWriteWrapper = null;
            return future;
        }

        private Future<?> handleWriteWrapper(WriteWrapper<?> writeWrapper) {
            Collection<?> databeans = writeWrapper.getObjects();
            if (CollectionTool.isEmpty(databeans)) {
                return null;
            }
            String opDesc = String.format("%s with %s %s", writeWrapper.getOp(), databeans.size(), CollectionTool.getFirst(databeans).getClass().getSimpleName());
            Object writeWrapperClone = writeWrapper.clone();
            Future<?> future = BaseWriteBehindNode.this.writeExecutor.submit(() -> this.lambda$0((WriteWrapper)writeWrapperClone, opDesc));
            BaseWriteBehindNode.this.outstandingWrites.add(new OutstandingWriteWrapper(System.currentTimeMillis(), future, opDesc));
            return future;
        }

        private /* synthetic */ void lambda$0(WriteWrapper writeWrapper, String string) {
            try {
                if (!BaseWriteBehindNode.this.handleWriteWrapperInternal(writeWrapper)) {
                    logger.error("unhandled op desc={}", (Object)string);
                }
            }
            catch (Throwable t) {
                logger.error("opDesc={}", (Object)string, (Object)t);
            }
        }
    }
}

