/*
 * Decompiled with CFR 0.152.
 */
package io.activej.aggregation;

import io.activej.aggregation.AggregationChunk;
import io.activej.aggregation.AggregationChunkStorage;
import io.activej.aggregation.PrimaryKey;
import io.activej.aggregation.ot.AggregationStructure;
import io.activej.aggregation.util.PartitionPredicate;
import io.activej.async.AsyncAccumulator;
import io.activej.codegen.DefiningClassLoader;
import io.activej.datastream.ForwardingStreamConsumer;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamConsumerSwitcher;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.util.ArrayList;
import java.util.List;

public final class AggregationChunker<C, T>
extends ForwardingStreamConsumer<T> {
    private final StreamConsumerSwitcher<T> switcher;
    private final SettablePromise<List<AggregationChunk>> result = new SettablePromise();
    private final AggregationStructure aggregation;
    private final List<String> fields;
    private final Class<T> recordClass;
    private final PartitionPredicate<T> partitionPredicate;
    private final AggregationChunkStorage<C> storage;
    private final AsyncAccumulator<List<AggregationChunk>> chunksAccumulator;
    private final DefiningClassLoader classLoader;
    private final int chunkSize;

    private AggregationChunker(StreamConsumerSwitcher<T> switcher, AggregationStructure aggregation, List<String> fields, Class<T> recordClass, PartitionPredicate<T> partitionPredicate, AggregationChunkStorage<C> storage, DefiningClassLoader classLoader, int chunkSize) {
        super(switcher);
        this.switcher = switcher;
        this.aggregation = aggregation;
        this.fields = fields;
        this.recordClass = recordClass;
        this.partitionPredicate = partitionPredicate;
        this.storage = storage;
        this.classLoader = classLoader;
        this.chunkSize = chunkSize;
        this.chunksAccumulator = AsyncAccumulator.create(new ArrayList());
        this.chunksAccumulator.run(this.getAcknowledgement()).whenComplete((arg_0, arg_1) -> this.result.trySet(arg_0, arg_1));
    }

    public static <C, T> AggregationChunker<C, T> create(AggregationStructure aggregation, List<String> fields, Class<T> recordClass, PartitionPredicate<T> partitionPredicate, AggregationChunkStorage<C> storage, DefiningClassLoader classLoader, int chunkSize) {
        StreamConsumerSwitcher switcher = StreamConsumerSwitcher.create();
        AggregationChunker<C, T> chunker = new AggregationChunker<C, T>(switcher, aggregation, fields, recordClass, partitionPredicate, storage, classLoader, chunkSize);
        super.startNewChunk();
        return chunker;
    }

    public Promise<List<AggregationChunk>> getResult() {
        return this.result;
    }

    private void startNewChunk() {
        this.switcher.switchTo(StreamConsumer.ofPromise((Promise)this.storage.createId().then(chunkId -> this.storage.write(this.aggregation, this.fields, this.recordClass, chunkId, this.classLoader).map(streamConsumer -> new ChunkWriter(streamConsumer, chunkId, this.chunkSize, this.partitionPredicate)).whenResult(chunkWriter -> this.chunksAccumulator.addPromise(chunkWriter.getResult(), (chunks, newChunk) -> {
            if (newChunk != null && newChunk.getCount() != 0) {
                chunks.add(newChunk);
            }
        })))));
    }

    private class ChunkWriter
    extends ForwardingStreamConsumer<T>
    implements StreamDataAcceptor<T> {
        private final SettablePromise<AggregationChunk> result;
        private final int chunkSize;
        private final PartitionPredicate<T> partitionPredicate;
        private StreamDataAcceptor<T> dataAcceptor;
        private T first;
        private T last;
        private int count;

        public ChunkWriter(StreamConsumer<T> actualConsumer, C chunkId, int chunkSize, PartitionPredicate<T> partitionPredicate) {
            super(actualConsumer);
            this.result = new SettablePromise();
            this.chunkSize = chunkSize;
            this.partitionPredicate = partitionPredicate;
            actualConsumer.getAcknowledgement().map($ -> this.count == 0 ? null : AggregationChunk.create(chunkId, AggregationChunker.this.fields, PrimaryKey.ofObject(this.first, AggregationChunker.this.aggregation.getKeys()), PrimaryKey.ofObject(this.last, AggregationChunker.this.aggregation.getKeys()), this.count)).whenComplete((arg_0, arg_1) -> this.result.trySet(arg_0, arg_1));
        }

        public StreamDataAcceptor<T> getDataAcceptor() {
            this.dataAcceptor = super.getDataAcceptor();
            return this.dataAcceptor != null ? this : null;
        }

        public void accept(T item) {
            if (this.first == null) {
                this.first = item;
            }
            this.last = item;
            this.dataAcceptor.accept(item);
            if (++this.count == this.chunkSize || this.partitionPredicate != null && !this.partitionPredicate.isSamePartition(this.last, item)) {
                AggregationChunker.this.startNewChunk();
            }
        }

        public Promise<AggregationChunk> getResult() {
            return this.result;
        }
    }
}

