/*
 * Decompiled with CFR 0.152.
 */
package kr.jm.utils.flow.subscriber;

import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import kr.jm.utils.exception.JMExceptionManager;
import kr.jm.utils.helper.JMLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JMSubscriber<T>
implements Flow.Subscriber<T> {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    private Flow.Subscription subscription;
    private Consumer<T> dataConsumer;

    protected JMSubscriber() {
        this.dataConsumer = d -> JMExceptionManager.handleException((Logger)this.log, (Throwable)JMExceptionManager.newRunTimeException((String)("DataConsumer Wasn't Set !!! - Flush " + d)), (String)"JMSubscriber", (Object[])new Object[0]);
    }

    public JMSubscriber(Consumer<T> dataConsumer) {
        this.setDataConsumer(dataConsumer);
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        JMLog.info((Logger)this.log, (String)"onSubscribe", (Object[])new Object[]{subscription});
        this.subscription = subscription;
        this.requestNext(this.subscription);
    }

    private void requestNext(Flow.Subscription subscription) {
        subscription.request(1L);
    }

    @Override
    public void onNext(T item) {
        JMLog.debug((Logger)this.log, (String)"onNext", (Object[])new Object[]{item});
        Optional.ofNullable(item).ifPresent(this.dataConsumer);
        Optional.ofNullable(this.subscription).ifPresent(this::requestNext);
    }

    @Override
    public void onError(Throwable throwable) {
        JMExceptionManager.handleException((Logger)this.log, (Throwable)throwable, (String)"onError", (Object[])new Object[0]);
    }

    @Override
    public void onComplete() {
        JMLog.info((Logger)this.log, (String)"onComplete");
    }

    public void setDataConsumer(Consumer<T> dataConsumer) {
        this.dataConsumer = dataConsumer;
    }
}

