/*
 * Decompiled with CFR 0.152.
 */
package kr.jm.utils.flow.processor;

import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.function.Function;
import kr.jm.utils.exception.JMExceptionManager;
import kr.jm.utils.flow.processor.JMProcessorInterface;
import kr.jm.utils.flow.publisher.JMSubmissionPublisher;
import kr.jm.utils.flow.subscriber.JMSubscriber;
import kr.jm.utils.flow.subscriber.JMSubscriberBuilder;
import kr.jm.utils.helper.JMLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JMProcessor<T, R>
implements JMProcessorInterface<T, R>,
AutoCloseable {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    private Function<T, R> transformFunction;
    private JMSubmissionPublisher<R> outputPublisher;
    private JMSubscriber<T> inputSubscriber;

    public JMProcessor(Function<T, R> transformFunction) {
        this.transformFunction = transformFunction;
        this.outputPublisher = new JMSubmissionPublisher();
        this.inputSubscriber = JMSubscriberBuilder.build(this::process);
    }

    protected void process(T input) {
        try {
            Optional.ofNullable(this.transformFunction.apply(input)).ifPresent(this.outputPublisher::submit);
        }
        catch (Exception e) {
            JMExceptionManager.handleException((Logger)this.log, (Throwable)e, (String)"process", (Object[])new Object[]{input});
        }
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        JMLog.info((Logger)this.log, (String)"onSubscribe", (Object[])new Object[]{subscription});
        this.inputSubscriber.onSubscribe(subscription);
    }

    @Override
    public void onNext(T item) {
        JMLog.debug((Logger)this.log, (String)"onNext", (Object[])new Object[]{item});
        this.inputSubscriber.onNext(item);
    }

    @Override
    public void onError(Throwable throwable) {
        this.inputSubscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        JMLog.info((Logger)this.log, (String)"onComplete");
        this.inputSubscriber.onComplete();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super R> subscriber) {
        JMLog.info((Logger)this.log, (String)"subscribe", (Object[])new Object[]{subscriber});
        this.outputPublisher.subscribe(subscriber);
    }

    @Override
    public void close() {
        JMLog.info((Logger)this.log, (String)"close");
        this.outputPublisher.close();
    }
}

