/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.aggregation;

import io.datakernel.aggregation.Aggregate;
import io.datakernel.aggregation.AggregationChunk;
import io.datakernel.aggregation.AggregationChunkStorage;
import io.datakernel.aggregation.AggregationChunker;
import io.datakernel.aggregation.AggregationGroupReducer;
import io.datakernel.aggregation.AggregationPredicate;
import io.datakernel.aggregation.AggregationPredicates;
import io.datakernel.aggregation.AggregationQuery;
import io.datakernel.aggregation.AggregationState;
import io.datakernel.aggregation.AggregationStats;
import io.datakernel.aggregation.AggregationUtils;
import io.datakernel.aggregation.IAggregation;
import io.datakernel.aggregation.PrimaryKey;
import io.datakernel.aggregation.QueryPlan;
import io.datakernel.aggregation.fieldtype.FieldType;
import io.datakernel.aggregation.ot.AggregationDiff;
import io.datakernel.aggregation.ot.AggregationStructure;
import io.datakernel.codegen.ClassBuilder;
import io.datakernel.codegen.DefiningClassLoader;
import io.datakernel.codegen.Expression;
import io.datakernel.codegen.Expressions;
import io.datakernel.common.Initializable;
import io.datakernel.common.Preconditions;
import io.datakernel.common.Utils;
import io.datakernel.common.collection.CollectionUtils;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.datastream.StreamSupplierTransformer;
import io.datakernel.datastream.processor.StreamFilter;
import io.datakernel.datastream.processor.StreamMapper;
import io.datakernel.datastream.processor.StreamReducer;
import io.datakernel.datastream.processor.StreamReducers;
import io.datakernel.datastream.processor.StreamSorter;
import io.datakernel.datastream.processor.StreamSorterStorage;
import io.datakernel.datastream.processor.StreamSorterStorageImpl;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.api.JmxAttribute;
import io.datakernel.promise.Promise;
import io.datakernel.serializer.BinarySerializer;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Aggregation
implements IAggregation,
Initializable<Aggregation>,
EventloopJmxMBeanEx {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    public static final int DEFAULT_CHUNK_SIZE = 1000000;
    public static final int DEFAULT_REDUCER_BUFFER_SIZE = 2000;
    public static final int DEFAULT_SORTER_ITEMS_IN_MEMORY = 1000000;
    public static final Duration DEFAULT_MAX_INCREMENTAL_RELOAD_PERIOD = Duration.ofMinutes(10L);
    public static final int DEFAULT_MAX_CHUNKS_TO_CONSOLIDATE = 1000;
    private final Eventloop eventloop;
    private final Executor executor;
    private final DefiningClassLoader classLoader;
    private final AggregationChunkStorage<Object> aggregationChunkStorage;
    private Path temporarySortDir;
    private final AggregationStructure structure;
    private AggregationState state;
    private int chunkSize = 1000000;
    private int reducerBufferSize = 2000;
    private int sorterItemsInMemory = 1000000;
    private Duration maxIncrementalReloadPeriod = DEFAULT_MAX_INCREMENTAL_RELOAD_PERIOD;
    private boolean ignoreChunkReadingExceptions = false;
    private int maxChunksToConsolidate = 1000;
    private AggregationStats stats = new AggregationStats();
    private long consolidationStarted;
    private long consolidationLastTimeMillis;
    private int consolidations;
    private Throwable consolidationLastError;

    private Aggregation(Eventloop eventloop, Executor executor, DefiningClassLoader classLoader, AggregationChunkStorage aggregationChunkStorage, AggregationStructure structure, AggregationState state) {
        this.eventloop = eventloop;
        this.executor = executor;
        this.classLoader = classLoader;
        this.aggregationChunkStorage = aggregationChunkStorage;
        this.structure = structure;
        this.state = state;
    }

    public static Aggregation create(Eventloop eventloop, Executor executor, DefiningClassLoader classLoader, AggregationChunkStorage aggregationChunkStorage, @NotNull AggregationStructure structure) {
        return new Aggregation(eventloop, executor, classLoader, aggregationChunkStorage, structure, new AggregationState(structure));
    }

    public Aggregation withChunkSize(int chunkSize) {
        this.chunkSize = chunkSize;
        return this;
    }

    public Aggregation withReducerBufferSize(int reducerBufferSize) {
        this.reducerBufferSize = reducerBufferSize;
        return this;
    }

    public Aggregation withSorterItemsInMemory(int sorterItemsInMemory) {
        this.sorterItemsInMemory = sorterItemsInMemory;
        return this;
    }

    public Aggregation withMaxIncrementalReloadPeriod(Duration maxIncrementalReloadPeriod) {
        this.maxIncrementalReloadPeriod = maxIncrementalReloadPeriod;
        return this;
    }

    public Aggregation withIgnoreChunkReadingExceptions(boolean ignoreChunkReadingExceptions) {
        this.ignoreChunkReadingExceptions = ignoreChunkReadingExceptions;
        return this;
    }

    public Aggregation withMaxChunksToConsolidate(int maxChunksToConsolidate) {
        this.maxChunksToConsolidate = maxChunksToConsolidate;
        return this;
    }

    public Aggregation withTemporarySortDir(Path temporarySortDir) {
        this.temporarySortDir = temporarySortDir;
        return this;
    }

    public Aggregation withStats(AggregationStats stats) {
        this.stats = stats;
        return this;
    }

    public AggregationStructure getStructure() {
        return this.structure;
    }

    public AggregationState getState() {
        return this.state;
    }

    public void setState(AggregationState state) {
        this.state = state;
    }

    public AggregationState detachState() {
        AggregationState state = this.state;
        this.state = null;
        return state;
    }

    public List<String> getKeys() {
        return this.structure.getKeys();
    }

    public List<String> getMeasures() {
        return this.structure.getMeasures();
    }

    public Map<String, FieldType> getKeyTypes() {
        return this.structure.getKeyTypes();
    }

    public Map<String, FieldType> getMeasureTypes() {
        return this.structure.getMeasureTypes();
    }

    public List<String> getPartitioningKey() {
        return this.structure.getPartitioningKey();
    }

    public <K extends Comparable, I, O, A> StreamReducers.Reducer<K, I, O, A> aggregationReducer(Class<I> inputClass, Class<O> outputClass, List<String> keys, List<String> measures, DefiningClassLoader classLoader) {
        return AggregationUtils.aggregationReducer(this.structure, inputClass, outputClass, keys, measures, classLoader);
    }

    public <T, C, K extends Comparable> Promise<AggregationDiff> consume(StreamSupplier<T> supplier, Class<T> inputClass, Map<String, String> keyFields, Map<String, String> measureFields) {
        Preconditions.checkArgument((boolean)new HashSet<String>(this.getKeys()).equals(keyFields.keySet()), (String)"Expected keys: %s, actual keyFields: %s", (Object[])new Object[]{this.getKeys(), keyFields});
        Preconditions.checkArgument((boolean)this.getMeasureTypes().keySet().containsAll(measureFields.keySet()), (String)"Unknown measures: %s", (Object[])new Object[]{CollectionUtils.difference(measureFields.keySet(), this.getMeasureTypes().keySet())});
        this.logger.info("Started consuming data in aggregation {}. Keys: {} Measures: {}", new Object[]{this, keyFields.keySet(), measureFields.keySet()});
        Class keyClass = AggregationUtils.createKeyClass(CollectionUtils.keysToMap(this.getKeys().stream(), this.structure.getKeyTypes()::get), this.classLoader);
        Set<String> measureFieldKeys = measureFields.keySet();
        List<String> measures = this.getMeasureTypes().keySet().stream().filter(measureFieldKeys::contains).collect(Collectors.toList());
        Class recordClass = AggregationUtils.createRecordClass(this.structure, this.getKeys(), measures, this.classLoader);
        Aggregate aggregate = AggregationUtils.createPreaggregator(this.structure, inputClass, recordClass, keyFields, measureFields, this.classLoader);
        Function keyFunction = AggregationUtils.createKeyFunction(inputClass, keyClass, this.getKeys(), this.classLoader);
        AggregationGroupReducer groupReducer = new AggregationGroupReducer(this.aggregationChunkStorage, this.structure, measures, recordClass, AggregationUtils.createPartitionPredicate(recordClass, this.getPartitioningKey(), this.classLoader), keyFunction, aggregate, this.chunkSize, this.classLoader);
        return supplier.streamTo(groupReducer).then($ -> groupReducer.getResult()).map(chunks -> AggregationDiff.of(new HashSet<AggregationChunk>((Collection<AggregationChunk>)chunks)));
    }

    public <T> Promise<AggregationDiff> consume(StreamSupplier<T> supplier, Class<T> inputClass) {
        return this.consume(supplier, inputClass, AggregationUtils.scanKeyFields(inputClass), AggregationUtils.scanMeasureFields(inputClass));
    }

    public double estimateCost(AggregationQuery query) {
        List<String> measures = this.getMeasures();
        List<String> aggregationFields = query.getMeasures().stream().filter(measures::contains).collect(Collectors.toList());
        return this.state.findChunks(query.getPredicate(), aggregationFields).size();
    }

    public <T> StreamSupplier<T> query(AggregationQuery query, Class<T> outputClass) {
        return this.query(query, outputClass, this.classLoader);
    }

    @Override
    public <T> StreamSupplier<T> query(AggregationQuery query, Class<T> outputClass, DefiningClassLoader queryClassLoader) {
        Preconditions.checkArgument((boolean)CollectionUtils.iterate((Object)queryClassLoader, Objects::nonNull, ClassLoader::getParent).anyMatch(Predicate.isEqual(this.classLoader)), (Object)"Unrelated queryClassLoader");
        List<String> fields = this.getMeasures().stream().filter(query.getMeasures()::contains).collect(Collectors.toList());
        List<AggregationChunk> allChunks = this.state.findChunks(query.getPredicate(), fields);
        return this.consolidatedSupplier(query.getKeys(), fields, outputClass, query.getPredicate(), allChunks, queryClassLoader);
    }

    private <T> StreamSupplier<T> sortStream(StreamSupplier<T> unsortedStream, Class<T> resultClass, List<String> allKeys, List<String> measures, DefiningClassLoader classLoader) {
        Comparator<T> keyComparator = AggregationUtils.createKeyComparator(resultClass, allKeys, classLoader);
        BinarySerializer<T> binarySerializer = AggregationUtils.createBinarySerializer(this.structure, resultClass, this.getKeys(), measures, classLoader);
        Path sortDir = (Path)Utils.nullToSupplier((Object)this.temporarySortDir, this::createSortDir);
        StreamSupplier stream = (StreamSupplier)unsortedStream.transformWith((StreamSupplierTransformer)StreamSorter.create((StreamSorterStorage)StreamSorterStorageImpl.create((Executor)this.executor, binarySerializer, (Path)sortDir), Function.identity(), keyComparator, (boolean)false, (int)this.sorterItemsInMemory));
        stream.getEndOfStream().whenComplete(($, e) -> {
            if (this.temporarySortDir == null) {
                this.deleteSortDirSilent(sortDir);
            }
        });
        return stream;
    }

    private Promise<List<AggregationChunk>> doConsolidation(List<AggregationChunk> chunksToConsolidate) {
        HashSet<String> aggregationFields = new HashSet<String>(this.getMeasures());
        HashSet<String> chunkFields = new HashSet<String>();
        for (AggregationChunk chunk : chunksToConsolidate) {
            for (String measure : chunk.getMeasures()) {
                if (!aggregationFields.contains(measure)) continue;
                chunkFields.add(measure);
            }
        }
        List<String> measures = this.getMeasures().stream().filter(chunkFields::contains).collect(Collectors.toList());
        Class resultClass = AggregationUtils.createRecordClass(this.structure, this.getKeys(), measures, this.classLoader);
        StreamSupplier consolidatedSupplier = this.consolidatedSupplier(this.getKeys(), measures, resultClass, AggregationPredicates.alwaysTrue(), chunksToConsolidate, this.classLoader);
        AggregationChunker chunker = AggregationChunker.create(this.structure, measures, resultClass, AggregationUtils.createPartitionPredicate(resultClass, this.getPartitioningKey(), this.classLoader), this.aggregationChunkStorage, this.classLoader, this.chunkSize);
        return consolidatedSupplier.streamTo(chunker).then($ -> chunker.getResult());
    }

    private static void addChunkToPlan(Map<List<String>, TreeMap<PrimaryKey, List<QueryPlan.Sequence>>> planIndex, AggregationChunk chunk, List<String> queryFields) {
        List list;
        QueryPlan.Sequence sequence;
        queryFields = new ArrayList<String>(queryFields);
        queryFields.retainAll(chunk.getMeasures());
        Preconditions.checkArgument((!queryFields.isEmpty() ? 1 : 0) != 0, (Object)"All of query fields are contained in measures of a chunk");
        TreeMap map = planIndex.computeIfAbsent(queryFields, k -> new TreeMap());
        Map.Entry entry = map.lowerEntry(chunk.getMinPrimaryKey());
        if (entry == null) {
            sequence = new QueryPlan.Sequence(queryFields);
        } else {
            list = (List)entry.getValue();
            sequence = (QueryPlan.Sequence)list.remove(list.size() - 1);
            if (list.isEmpty()) {
                map.remove(entry.getKey());
            }
        }
        sequence.add(chunk);
        list = map.computeIfAbsent(chunk.getMaxPrimaryKey(), k -> new ArrayList());
        list.add(sequence);
    }

    private static QueryPlan createPlan(List<AggregationChunk> chunks, List<String> queryFields) {
        HashMap<List<String>, TreeMap<PrimaryKey, List<QueryPlan.Sequence>>> index = new HashMap<List<String>, TreeMap<PrimaryKey, List<QueryPlan.Sequence>>>();
        chunks = new ArrayList<AggregationChunk>(chunks);
        chunks.sort(Comparator.comparing(AggregationChunk::getMinPrimaryKey));
        for (AggregationChunk chunk : chunks) {
            Aggregation.addChunkToPlan(index, chunk, queryFields);
        }
        ArrayList<QueryPlan.Sequence> sequences = new ArrayList<QueryPlan.Sequence>();
        for (TreeMap map : index.values()) {
            for (List list : map.values()) {
                sequences.addAll(list);
            }
        }
        return new QueryPlan(sequences);
    }

    private <R, S> StreamSupplier<R> consolidatedSupplier(List<String> queryKeys, List<String> measures, Class<R> resultClass, AggregationPredicate where, List<AggregationChunk> individualChunks, DefiningClassLoader queryClassLoader) {
        QueryPlan plan = Aggregation.createPlan(individualChunks, measures);
        this.logger.info("Query plan for {} in aggregation {}: {}", new Object[]{queryKeys, this, plan});
        boolean alreadySorted = this.getKeys().subList(0, Math.min(this.getKeys().size(), queryKeys.size())).equals(queryKeys);
        ArrayList<SequenceStream<S>> sequenceStreams = new ArrayList<SequenceStream<S>>();
        for (QueryPlan.Sequence sequence : plan.getSequences()) {
            Class sequenceClass = AggregationUtils.createRecordClass(this.structure, this.getKeys(), sequence.getChunksFields(), this.classLoader);
            StreamSupplier stream = this.sequenceStream(where, sequence.getChunks(), sequenceClass, queryClassLoader);
            if (!alreadySorted) {
                stream = this.sortStream(stream, sequenceClass, queryKeys, sequence.getQueryFields(), this.classLoader);
            }
            sequenceStreams.add(new SequenceStream(stream, sequence.getQueryFields(), sequenceClass));
        }
        return this.mergeSequences(queryKeys, measures, resultClass, sequenceStreams, queryClassLoader);
    }

    private <S, R, K extends Comparable> StreamSupplier<R> mergeSequences(List<String> queryKeys, List<String> measures, Class<R> resultClass, List<SequenceStream<S>> sequences, DefiningClassLoader classLoader) {
        if (sequences.size() == 1 && new HashSet<String>(queryKeys).equals(new HashSet<String>(this.getKeys()))) {
            SequenceStream<S> sequence = sequences.get(0);
            Function mapper = AggregationUtils.createMapper(sequence.type, resultClass, queryKeys, measures.stream().filter(sequence.fields::contains).collect(Collectors.toList()), classLoader);
            return (StreamSupplier)((StreamSupplier)sequence.stream.transformWith((StreamSupplierTransformer)StreamMapper.create(mapper))).transformWith(this.stats.mergeMapOutput);
        }
        StreamReducer streamReducer = StreamReducer.create(Comparable::compareTo);
        if (this.reducerBufferSize != 0 && this.reducerBufferSize != 2000) {
            streamReducer = streamReducer.withBufferSize(this.reducerBufferSize);
        }
        Class keyClass = AggregationUtils.createKeyClass(CollectionUtils.keysToMap(queryKeys.stream(), this.structure.getKeyTypes()::get), this.classLoader);
        for (SequenceStream<S> sequence : sequences) {
            Function extractKeyFunction = AggregationUtils.createKeyFunction(sequence.type, keyClass, queryKeys, this.classLoader);
            StreamReducers.Reducer reducer = AggregationUtils.aggregationReducer(this.structure, sequence.type, resultClass, queryKeys, measures.stream().filter(sequence.fields::contains).collect(Collectors.toList()), classLoader);
            sequence.stream.streamTo((StreamConsumer)streamReducer.newInput(extractKeyFunction, reducer).transformWith(this.stats.mergeReducerInput));
        }
        return (StreamSupplier)streamReducer.getOutput().transformWith(this.stats.mergeReducerOutput);
    }

    private <T> StreamSupplier<T> sequenceStream(final AggregationPredicate where, List<AggregationChunk> individualChunks, final Class<T> sequenceClass, final DefiningClassLoader queryClassLoader) {
        final Iterator<AggregationChunk> chunkIterator = individualChunks.iterator();
        return StreamSupplier.concat((Iterator)new Iterator<StreamSupplier<T>>(){

            @Override
            public boolean hasNext() {
                return chunkIterator.hasNext();
            }

            @Override
            public StreamSupplier<T> next() {
                AggregationChunk chunk = (AggregationChunk)chunkIterator.next();
                return Aggregation.this.chunkReaderWithFilter(where, chunk, sequenceClass, queryClassLoader);
            }
        });
    }

    private <T> StreamSupplier<T> chunkReaderWithFilter(AggregationPredicate where, AggregationChunk chunk, Class<T> chunkRecordClass, DefiningClassLoader queryClassLoader) {
        return (StreamSupplier)StreamSupplier.ofPromise(this.aggregationChunkStorage.read(this.structure, chunk.getMeasures(), chunkRecordClass, chunk.getChunkId(), this.classLoader)).transformWith((StreamSupplierTransformer)(where != AggregationPredicates.alwaysTrue() ? StreamFilter.create(this.createPredicate(chunkRecordClass, where, queryClassLoader)) : StreamSupplierTransformer.identity()));
    }

    private <T> Predicate<T> createPredicate(Class<T> chunkRecordClass, AggregationPredicate where, DefiningClassLoader classLoader) {
        return (Predicate)ClassBuilder.create((DefiningClassLoader)classLoader, Predicate.class).withMethod("test", Boolean.TYPE, Collections.singletonList(Object.class), (Expression)where.createPredicateDef(Expressions.cast((Expression)Expressions.arg((int)0), chunkRecordClass), this.getKeyTypes())).buildClassAndCreateNewInstance();
    }

    @JmxAttribute
    public int getNumberOfOverlappingChunks() {
        return this.state.findOverlappingChunks().size();
    }

    public Promise<AggregationDiff> consolidateMinKey() {
        return this.doConsolidate(false);
    }

    public Promise<AggregationDiff> consolidateHotSegment() {
        return this.doConsolidate(true);
    }

    private Promise<AggregationDiff> doConsolidate(boolean hotSegment) {
        List<AggregationChunk> chunks;
        List<AggregationChunk> list = chunks = hotSegment ? this.state.findChunksForConsolidationHotSegment(this.maxChunksToConsolidate) : this.state.findChunksForConsolidationMinKey(this.maxChunksToConsolidate, this.chunkSize);
        if (chunks.isEmpty()) {
            this.logger.info("Nothing to consolidate in aggregation '{}", (Object)this);
            return Promise.of((Object)AggregationDiff.empty());
        }
        this.logger.info("Starting consolidation of aggregation '{}'", (Object)this);
        this.consolidationStarted = this.eventloop.currentTimeMillis();
        return this.doConsolidation(chunks).whenComplete(($, e) -> {
            if (e == null) {
                this.consolidationLastTimeMillis = this.eventloop.currentTimeMillis() - this.consolidationStarted;
                ++this.consolidations;
            } else {
                this.consolidationStarted = 0L;
                this.consolidationLastError = e;
            }
        }).map(removedChunks -> AggregationDiff.of(new LinkedHashSet<AggregationChunk>((Collection<AggregationChunk>)removedChunks), new LinkedHashSet<AggregationChunk>(chunks)));
    }

    private Path createSortDir() {
        try {
            return Files.createTempDirectory("aggregation_sort_dir", new FileAttribute[0]);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void deleteSortDirSilent(Path sortDir) {
        try {
            Files.delete(sortDir);
        }
        catch (IOException e) {
            this.logger.warn("Could not delete temporal directory {} : {}", (Object)this.temporarySortDir, (Object)e.toString());
        }
    }

    public static String getChunkIds(Iterable<AggregationChunk> chunks) {
        ArrayList<Object> ids = new ArrayList<Object>();
        for (AggregationChunk chunk : chunks) {
            ids.add(chunk.getChunkId());
        }
        return ids.stream().map(Object::toString).collect(Collectors.joining(", "));
    }

    @JmxAttribute
    public Duration getMaxIncrementalReloadPeriod() {
        return this.maxIncrementalReloadPeriod;
    }

    @JmxAttribute
    public void setMaxIncrementalReloadPeriod(Duration maxIncrementalReloadPeriod) {
        this.maxIncrementalReloadPeriod = maxIncrementalReloadPeriod;
    }

    @JmxAttribute
    public int getChunkSize() {
        return this.chunkSize;
    }

    @JmxAttribute
    public void setChunkSize(int chunkSize) {
        this.chunkSize = chunkSize;
    }

    @JmxAttribute
    public int getSorterItemsInMemory() {
        return this.sorterItemsInMemory;
    }

    @JmxAttribute
    public void setSorterItemsInMemory(int sorterItemsInMemory) {
        this.sorterItemsInMemory = sorterItemsInMemory;
    }

    @JmxAttribute
    public boolean isIgnoreChunkReadingExceptions() {
        return this.ignoreChunkReadingExceptions;
    }

    @JmxAttribute
    public void setIgnoreChunkReadingExceptions(boolean ignoreChunkReadingExceptions) {
        this.ignoreChunkReadingExceptions = ignoreChunkReadingExceptions;
    }

    @JmxAttribute
    public int getMaxChunksToConsolidate() {
        return this.maxChunksToConsolidate;
    }

    @JmxAttribute
    public void setMaxChunksToConsolidate(int maxChunksToConsolidate) {
        this.maxChunksToConsolidate = maxChunksToConsolidate;
    }

    @JmxAttribute
    @Nullable
    public Integer getConsolidationSeconds() {
        return this.consolidationStarted == 0L ? null : Integer.valueOf((int)((this.eventloop.currentTimeMillis() - this.consolidationStarted) / 1000L));
    }

    @JmxAttribute
    @Nullable
    public Integer getConsolidationLastTimeSeconds() {
        return this.consolidationLastTimeMillis == 0L ? null : Integer.valueOf((int)(this.consolidationLastTimeMillis / 1000L));
    }

    @JmxAttribute
    public int getConsolidations() {
        return this.consolidations;
    }

    @JmxAttribute
    public Throwable getConsolidationLastError() {
        return this.consolidationLastError;
    }

    @JmxAttribute
    public int getChunks() {
        return this.state.getChunks().size();
    }

    @JmxAttribute
    public AggregationStats getStats() {
        return this.stats;
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    public String toString() {
        return "{" + this.getKeyTypes().keySet() + " " + this.getMeasureTypes().keySet() + '}';
    }

    static final class SequenceStream<S> {
        final StreamSupplier<S> stream;
        final List<String> fields;
        final Class<S> type;

        private SequenceStream(StreamSupplier<S> stream, List<String> fields, Class<S> type) {
            this.stream = stream;
            this.fields = fields;
            this.type = type;
        }
    }
}

