/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow.collector;

import io.activej.dataflow.DataflowClient;
import io.activej.dataflow.dataset.Dataset;
import io.activej.dataflow.graph.DataflowContext;
import io.activej.dataflow.graph.DataflowGraph;
import io.activej.dataflow.graph.Partition;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.node.NodeUpload;
import io.activej.datastream.StreamSupplier;
import java.util.ArrayList;
import java.util.List;

public final class Collector<T> {
    private final Dataset<T> input;
    private final DataflowClient client;

    public Collector(Dataset<T> input, DataflowClient client) {
        this.input = input;
        this.client = client;
    }

    public StreamSupplier<T> compile(DataflowGraph graph) {
        DataflowContext context = DataflowContext.of(graph);
        List<StreamId> inputStreamIds = this.input.channels(context);
        ArrayList suppliers = new ArrayList();
        int index = context.generateNodeIndex();
        for (StreamId streamId : inputStreamIds) {
            NodeUpload<T> nodeUpload = new NodeUpload<T>(index, this.input.valueType(), streamId);
            Partition partition = graph.getPartition(streamId);
            graph.addNode(partition, nodeUpload);
            StreamSupplier<T> supplier = this.client.download(partition.getAddress(), streamId, this.input.valueType());
            suppliers.add(supplier);
        }
        return StreamSupplier.concat(suppliers).withEndOfStream(eos -> eos.whenException(e -> suppliers.forEach(supplier -> supplier.closeEx(e))));
    }
}

