/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.aggregation;

import io.datakernel.aggregation.Aggregate;
import io.datakernel.aggregation.AggregationChunk;
import io.datakernel.aggregation.AggregationChunkStorage;
import io.datakernel.aggregation.AggregationChunker;
import io.datakernel.aggregation.ot.AggregationStructure;
import io.datakernel.aggregation.util.PartitionPredicate;
import io.datakernel.async.process.AsyncCollector;
import io.datakernel.codegen.DefiningClassLoader;
import io.datakernel.datastream.AbstractStreamConsumer;
import io.datakernel.datastream.StreamDataAcceptor;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.promise.Promise;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AggregationGroupReducer<C, T, K extends Comparable>
extends AbstractStreamConsumer<T>
implements StreamDataAcceptor<T> {
    private static final Logger logger = LoggerFactory.getLogger(AggregationGroupReducer.class);
    private final AggregationChunkStorage<C> storage;
    private final AggregationStructure aggregation;
    private final List<String> measures;
    private final PartitionPredicate<T> partitionPredicate;
    private final Class<T> recordClass;
    private final Function<T, K> keyFunction;
    private final Aggregate<T, Object> aggregate;
    private final AsyncCollector<List<AggregationChunk>> chunksCollector;
    private final DefiningClassLoader classLoader;
    private final int chunkSize;
    private final HashMap<K, Object> map = new HashMap();

    public AggregationGroupReducer(@NotNull AggregationChunkStorage<C> storage, @NotNull AggregationStructure aggregation, @NotNull List<String> measures, @NotNull Class<T> recordClass, @NotNull PartitionPredicate<T> partitionPredicate, @NotNull Function<T, K> keyFunction, @NotNull Aggregate<T, Object> aggregate, int chunkSize, @NotNull DefiningClassLoader classLoader) {
        this.storage = storage;
        this.measures = measures;
        this.partitionPredicate = partitionPredicate;
        this.recordClass = recordClass;
        this.keyFunction = keyFunction;
        this.aggregate = aggregate;
        this.chunkSize = chunkSize;
        this.aggregation = aggregation;
        this.chunksCollector = AsyncCollector.create(new ArrayList());
        this.classLoader = classLoader;
    }

    public Promise<List<AggregationChunk>> getResult() {
        return this.chunksCollector.get();
    }

    public void accept(T item) {
        Comparable key = (Comparable)this.keyFunction.apply(item);
        Object accumulator = this.map.get(key);
        if (accumulator != null) {
            this.aggregate.accumulate(accumulator, item);
        } else {
            accumulator = this.aggregate.createAccumulator(item);
            this.map.put(key, accumulator);
            if (this.map.size() == this.chunkSize) {
                this.doFlush();
            }
        }
    }

    protected void onStarted() {
        this.getSupplier().resume((StreamDataAcceptor)this);
    }

    private void doFlush() {
        if (this.map.isEmpty()) {
            return;
        }
        this.suspendOrResume();
        ArrayList<Map.Entry<K, Object>> entryList = new ArrayList<Map.Entry<K, Object>>(this.map.entrySet());
        this.map.clear();
        entryList.sort((o1, o2) -> {
            Comparable key1 = (Comparable)o1.getKey();
            Comparable key2 = (Comparable)o2.getKey();
            return key1.compareTo(key2);
        });
        ArrayList list = new ArrayList(entryList.size());
        for (Map.Entry entry : entryList) {
            list.add(entry.getValue());
        }
        StreamSupplier supplier = StreamSupplier.ofIterable(list);
        AggregationChunker<C, T> aggregationChunker = AggregationChunker.create(this.aggregation, this.measures, this.recordClass, this.partitionPredicate, this.storage, this.classLoader, this.chunkSize);
        this.chunksCollector.addPromise(supplier.streamTo(aggregationChunker).then($ -> chunker.getResult()), List::addAll).whenResult($ -> this.suspendOrResume());
    }

    private void suspendOrResume() {
        if (this.chunksCollector.getActivePromises() > 2) {
            logger.trace("Suspend group reduce: {}", (Object)this);
            this.getSupplier().suspend();
        } else {
            logger.trace("Resume group reduce: {}", (Object)this);
            this.getSupplier().resume((StreamDataAcceptor)this);
        }
    }

    protected Promise<Void> onEndOfStream() {
        this.doFlush();
        return this.chunksCollector.run().get().toVoid();
    }

    protected void onError(Throwable e) {
        this.chunksCollector.close(e);
    }

    public void flush() {
        this.doFlush();
    }

    public int getBufferSize() {
        return this.map.size();
    }

    public String toString() {
        return "AggregationGroupReducer{keys=" + this.aggregation.getKeys() + "measures=" + this.measures + ", chunkSize=" + this.chunkSize + ", map.size=" + this.map.size() + '}';
    }
}

