/*
 * Decompiled with CFR 0.152.
 */
package io.annot8.common.implementations.pipelines;

import io.annot8.common.implementations.factories.ItemCreator;
import io.annot8.common.implementations.factories.NotifyingItemFactory;
import io.annot8.common.implementations.pipelines.ItemQueue;
import io.annot8.common.implementations.pipelines.Pipeline;
import io.annot8.core.components.Annot8Component;
import io.annot8.core.components.Processor;
import io.annot8.core.components.Resource;
import io.annot8.core.components.Source;
import io.annot8.core.components.responses.ProcessorResponse;
import io.annot8.core.components.responses.SourceResponse;
import io.annot8.core.data.Item;
import io.annot8.core.data.ItemFactory;
import io.annot8.core.exceptions.Annot8Exception;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimplePipeline
implements Pipeline {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimplePipeline.class);
    private final String id = UUID.randomUUID().toString();
    private final NotifyingItemFactory itemFactory;
    private final ItemQueue itemQueue;
    private final Map<String, Resource> resources;
    private final List<Source> sources;
    private final List<Processor> processors;

    public SimplePipeline(ItemCreator itemCreator, ItemQueue itemQueue, Map<String, Resource> resources, List<Source> sources, List<Processor> processors) {
        this.itemFactory = new NotifyingItemFactory(itemCreator);
        this.itemQueue = itemQueue;
        this.resources = resources;
        this.sources = sources;
        this.processors = processors;
        this.itemFactory.registerListener(itemQueue::add);
    }

    public String getId() {
        return this.id;
    }

    @Override
    public void run() {
        for (Source source : this.sources) {
            this.process(source);
        }
    }

    private void process(Source source) {
        SourceResponse.Status status;
        do {
            SourceResponse response = source.read((ItemFactory)this.itemFactory);
            status = response.getStatus();
            this.processItemQueue();
        } while (status == SourceResponse.Status.OK || status == SourceResponse.Status.EMPTY);
        this.close();
    }

    private void processItemQueue() {
        if (this.itemQueue == null) {
            return;
        }
        while (this.itemQueue.hasItems()) {
            Item item = this.itemQueue.next();
            this.processItem(item);
        }
    }

    private void processItem(Item item) {
        for (Processor processor : this.processors) {
            try {
                ProcessorResponse response = processor.process(item);
                ProcessorResponse.Status status = response.getStatus();
                if (status == ProcessorResponse.Status.OK) {
                    if (!item.isDiscarded()) continue;
                    LOGGER.warn("Item discarded, stopping processing");
                    return;
                }
                if (status == ProcessorResponse.Status.PROCESSOR_ERROR) {
                    LOGGER.error("Pipeline problem, exiting");
                    System.exit(1);
                    continue;
                }
                if (status != ProcessorResponse.Status.ITEM_ERROR) continue;
                LOGGER.error("Item problem, skipping rest of pipeline");
                return;
            }
            catch (Annot8Exception e) {
                LOGGER.error("Failed to process data item with processor {}", (Object)processor.getClass().getName(), (Object)e);
            }
        }
    }

    @Override
    public void close() {
        this.sources.forEach(Annot8Component::close);
        this.processors.forEach(Annot8Component::close);
        this.resources.values().forEach(Annot8Component::close);
    }
}

