/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.aggregation;

import io.datakernel.aggregation.AggregationChunk;
import io.datakernel.aggregation.AggregationChunkStorage;
import io.datakernel.aggregation.PrimaryKey;
import io.datakernel.aggregation.ot.AggregationStructure;
import io.datakernel.aggregation.util.PartitionPredicate;
import io.datakernel.async.process.AsyncCollector;
import io.datakernel.codegen.DefiningClassLoader;
import io.datakernel.datastream.ForwardingStreamConsumer;
import io.datakernel.datastream.ForwardingStreamSupplier;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamConsumerSwitcher;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.promise.Promise;
import io.datakernel.promise.SettablePromise;
import java.util.ArrayList;
import java.util.List;
import org.jetbrains.annotations.NotNull;

public final class AggregationChunker<C, T>
extends ForwardingStreamConsumer<T> {
    private final StreamConsumerSwitcher<T> switcher;
    private final SettablePromise<List<AggregationChunk>> result = new SettablePromise();
    private final AggregationStructure aggregation;
    private final List<String> fields;
    private final Class<T> recordClass;
    private final PartitionPredicate<T> partitionPredicate;
    private final AggregationChunkStorage<C> storage;
    private final AsyncCollector<? extends List<AggregationChunk>> chunksCollector;
    private final DefiningClassLoader classLoader;
    private final int chunkSize;

    private AggregationChunker(StreamConsumerSwitcher<T> switcher, AggregationStructure aggregation, List<String> fields, Class<T> recordClass, PartitionPredicate<T> partitionPredicate, AggregationChunkStorage<C> storage, DefiningClassLoader classLoader, int chunkSize) {
        super(switcher);
        this.switcher = switcher;
        this.aggregation = aggregation;
        this.fields = fields;
        this.recordClass = recordClass;
        this.partitionPredicate = partitionPredicate;
        this.storage = storage;
        this.classLoader = classLoader;
        this.chunksCollector = AsyncCollector.create(new ArrayList());
        this.chunksCollector.run(switcher.getAcknowledgement());
        this.chunkSize = chunkSize;
        this.chunksCollector.get().whenComplete((arg_0, arg_1) -> this.result.trySet(arg_0, arg_1));
        this.getAcknowledgement().whenException(arg_0 -> this.result.trySetException(arg_0));
    }

    public static <C, T> AggregationChunker<C, T> create(AggregationStructure aggregation, List<String> fields, Class<T> recordClass, PartitionPredicate<T> partitionPredicate, AggregationChunkStorage<C> storage, DefiningClassLoader classLoader, int chunkSize) {
        StreamConsumerSwitcher switcher = StreamConsumerSwitcher.create();
        AggregationChunker<C, T> chunker = new AggregationChunker<C, T>(switcher, aggregation, fields, recordClass, partitionPredicate, storage, classLoader, chunkSize);
        super.startNewChunk();
        return chunker;
    }

    public Promise<List<AggregationChunk>> getResult() {
        return this.result;
    }

    private void startNewChunk() {
        StreamConsumer consumer = StreamConsumer.ofPromise((Promise)this.storage.createId().then(chunkId -> this.storage.write(this.aggregation, this.fields, this.recordClass, chunkId, this.classLoader).map(streamConsumer -> {
            ChunkWriter chunkWriter = new ChunkWriter(streamConsumer, chunkId, this.chunkSize, this.partitionPredicate);
            this.chunksCollector.addPromise(chunkWriter.getResult(), (accumulator, newChunk) -> {
                if (newChunk != null && newChunk.getCount() != 0) {
                    accumulator.add(newChunk);
                }
            });
            return chunkWriter.withLateBinding();
        })));
        this.switcher.switchTo(consumer);
    }

    private class ChunkWriter
    extends ForwardingStreamConsumer<T>
    implements StreamDataAcceptor<T> {
        private final SettablePromise<AggregationChunk> result;
        private final int chunkSize;
        private final PartitionPredicate<T> partitionPredicate;
        private StreamDataAcceptor<T> dataAcceptor;
        private T first;
        private T last;
        private int count;
        boolean switched;

        public ChunkWriter(StreamConsumer<T> actualConsumer, C chunkId, int chunkSize, PartitionPredicate<T> partitionPredicate) {
            super(actualConsumer);
            this.result = new SettablePromise();
            this.chunkSize = chunkSize;
            this.partitionPredicate = partitionPredicate;
            actualConsumer.getAcknowledgement().map($ -> this.count == 0 ? null : AggregationChunk.create(chunkId, AggregationChunker.this.fields, PrimaryKey.ofObject(this.first, AggregationChunker.this.aggregation.getKeys()), PrimaryKey.ofObject(this.last, AggregationChunker.this.aggregation.getKeys()), this.count)).whenComplete((arg_0, arg_1) -> this.result.trySet(arg_0, arg_1));
            this.getAcknowledgement().whenException(arg_0 -> this.result.trySetException(arg_0));
        }

        public void setSupplier(@NotNull StreamSupplier<T> supplier) {
            super.setSupplier((StreamSupplier)new ForwardingStreamSupplier<T>(supplier){

                public void resume(@NotNull StreamDataAcceptor<T> dataAcceptor) {
                    ChunkWriter.this.dataAcceptor = dataAcceptor;
                    super.resume((StreamDataAcceptor)ChunkWriter.this);
                }
            });
        }

        public void accept(T item) {
            if (this.first == null) {
                this.first = item;
            }
            this.last = item;
            this.dataAcceptor.accept(item);
            if ((++this.count == this.chunkSize || this.partitionPredicate != null && !this.partitionPredicate.isSamePartition(this.last, item)) && !this.switched) {
                this.switched = true;
                AggregationChunker.this.startNewChunk();
            }
        }

        public Promise<AggregationChunk> getResult() {
            return this.result;
        }
    }
}

