/*
 * Decompiled with CFR 0.152.
 */
package io.atomix.core.workqueue.impl;

import com.google.common.collect.ImmutableList;
import io.atomix.core.workqueue.AsyncWorkQueue;
import io.atomix.core.workqueue.Task;
import io.atomix.core.workqueue.WorkQueue;
import io.atomix.core.workqueue.WorkQueueStats;
import io.atomix.core.workqueue.impl.BlockingWorkQueue;
import io.atomix.core.workqueue.impl.WorkQueueClient;
import io.atomix.core.workqueue.impl.WorkQueueService;
import io.atomix.primitive.AbstractAsyncPrimitive;
import io.atomix.primitive.PrimitiveRegistry;
import io.atomix.primitive.PrimitiveState;
import io.atomix.primitive.proxy.ProxyClient;
import io.atomix.utils.concurrent.AbstractAccumulator;
import io.atomix.utils.concurrent.Accumulator;
import io.atomix.utils.concurrent.Threads;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkQueueProxy
extends AbstractAsyncPrimitive<AsyncWorkQueue<byte[]>, WorkQueueService>
implements AsyncWorkQueue<byte[]>,
WorkQueueClient {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ExecutorService executor;
    private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference();
    private final Timer timer = new Timer("atomix-work-queue-completer");
    private final AtomicBoolean isRegistered = new AtomicBoolean(false);

    public WorkQueueProxy(ProxyClient<WorkQueueService> proxy, PrimitiveRegistry registry) {
        super(proxy, registry);
        this.executor = Executors.newSingleThreadExecutor(Threads.namedThreads((String)("atomix-work-queue-" + proxy.name() + "-%d"), (Logger)this.log));
    }

    @Override
    public void taskAvailable() {
        this.resumeWork();
    }

    @Override
    public CompletableFuture<Void> addMultiple(Collection<byte[]> items) {
        if (items.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        return this.getProxyClient().acceptBy(this.name(), service -> service.add(items));
    }

    @Override
    public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) {
        if (maxTasks <= 0) {
            return CompletableFuture.completedFuture(ImmutableList.of());
        }
        return this.getProxyClient().applyBy(this.name(), service -> service.take(maxTasks));
    }

    @Override
    public CompletableFuture<Void> complete(Collection<String> taskIds) {
        if (taskIds.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        return this.getProxyClient().acceptBy(this.name(), service -> service.complete(taskIds));
    }

    @Override
    public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback, int parallelism, Executor executor) {
        CompletedTaskAccumulator completedTaskAccumulator = new CompletedTaskAccumulator(this.timer, 50, 50);
        this.taskProcessor.set(new TaskProcessor(callback, parallelism, executor, (Accumulator<String>)completedTaskAccumulator));
        return ((CompletableFuture)this.register().thenCompose(v -> this.take(parallelism))).thenAccept((Consumer)this.taskProcessor.get());
    }

    @Override
    public CompletableFuture<Void> stopProcessing() {
        return this.unregister();
    }

    @Override
    public CompletableFuture<WorkQueueStats> stats() {
        return this.getProxyClient().applyBy(this.name(), service -> service.stats());
    }

    private void resumeWork() {
        TaskProcessor activeProcessor = this.taskProcessor.get();
        if (activeProcessor == null) {
            return;
        }
        this.take(activeProcessor.headRoom()).whenCompleteAsync((tasks, e) -> activeProcessor.accept((Collection<Task<byte[]>>)tasks), (Executor)this.executor);
    }

    private CompletableFuture<Void> register() {
        return this.getProxyClient().acceptBy(this.name(), service -> service.register()).thenRun(() -> this.isRegistered.set(true));
    }

    private CompletableFuture<Void> unregister() {
        return this.getProxyClient().acceptBy(this.name(), service -> service.unregister()).thenRun(() -> this.isRegistered.set(false));
    }

    public CompletableFuture<AsyncWorkQueue<byte[]>> connect() {
        return ((CompletableFuture)((CompletableFuture)super.connect().thenCompose(v -> this.getProxyClient().getPartition(this.name()).connect())).thenRun(() -> this.getProxyClient().getPartition(this.name()).addStateChangeListener(state -> {
            if (state == PrimitiveState.CONNECTED && this.isRegistered.get()) {
                this.getProxyClient().acceptBy(this.name(), service -> service.register());
            }
        }))).thenApply(v -> this);
    }

    public CompletableFuture<Void> delete() {
        this.executor.shutdown();
        this.timer.cancel();
        return super.delete();
    }

    @Override
    public WorkQueue<byte[]> sync(Duration operationTimeout) {
        return new BlockingWorkQueue<byte[]>(this, operationTimeout.toMillis());
    }

    private class TaskProcessor
    implements Consumer<Collection<Task<byte[]>>> {
        private final AtomicInteger headRoom;
        private final Consumer<byte[]> backingConsumer;
        private final Executor executor;
        private final Accumulator<String> taskCompleter;

        TaskProcessor(Consumer<byte[]> backingConsumer, int parallelism, Executor executor, Accumulator<String> taskCompleter) {
            this.backingConsumer = backingConsumer;
            this.headRoom = new AtomicInteger(parallelism);
            this.executor = executor;
            this.taskCompleter = taskCompleter;
        }

        int headRoom() {
            return this.headRoom.get();
        }

        @Override
        public void accept(Collection<Task<byte[]>> tasks) {
            if (tasks == null) {
                return;
            }
            this.headRoom.addAndGet(-1 * tasks.size());
            tasks.forEach(task -> this.executor.execute(() -> {
                try {
                    this.backingConsumer.accept((byte[])task.payload());
                    this.taskCompleter.add((Object)task.taskId());
                }
                catch (Exception e) {
                    WorkQueueProxy.this.log.debug("Task execution failed", (Throwable)e);
                }
                finally {
                    this.headRoom.incrementAndGet();
                    WorkQueueProxy.this.resumeWork();
                }
            }));
        }
    }

    private class CompletedTaskAccumulator
    extends AbstractAccumulator<String> {
        CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) {
            super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE);
        }

        public void processItems(List<String> items) {
            WorkQueueProxy.this.complete(items);
        }
    }
}

