/*
 * Decompiled with CFR 0.152.
 */
package kr.jm.utils.flow.processor;

import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
import kr.jm.utils.flow.processor.JMTransformProcessor;
import kr.jm.utils.flow.publisher.SubmissionPublisherImplementsJM;
import kr.jm.utils.flow.subscriber.JMSubscriberBuilder;
import kr.jm.utils.helper.JMLog;
import kr.jm.utils.helper.JMThread;
import org.slf4j.Logger;

public class JMConcurrentTransformProcessor<T, R>
extends JMTransformProcessor<T, R>
implements AutoCloseable {
    private SubmissionPublisher<R> submissionPublisher;

    public JMConcurrentTransformProcessor(Function<T, R> transformerFunction) {
        this(Flow.defaultBufferSize(), transformerFunction);
    }

    public JMConcurrentTransformProcessor(int maxBufferCapacity, Function<T, R> transformerFunction) {
        this(null, maxBufferCapacity, transformerFunction);
    }

    public JMConcurrentTransformProcessor(Executor executor, int maxBufferCapacity, Function<T, R> transformerFunction) {
        super(transformerFunction);
        this.submissionPublisher = new SubmissionPublisherImplementsJM<R>(executor == null ? JMThread.getCommonPool() : executor, maxBufferCapacity);
        super.subscribe(JMSubscriberBuilder.build(this.submissionPublisher::submit));
    }

    @Override
    public void subscribe(Flow.Subscriber<? super R> subscriber) {
        this.submissionPublisher.subscribe(subscriber);
    }

    @Override
    public void close() {
        JMLog.info((Logger)this.log, (String)"close");
        this.submissionPublisher.close();
    }
}

