package org.apache.flink.cep.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.EventComparator;
import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.class */
public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Function> extends AbstractUdfStreamOperator<OUT, F> implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace> {
    private static final long serialVersionUID = -4166778210774160757L;
    private final boolean isProcessingTime;
    private final TypeSerializer<IN> inputSerializer;
    private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName";
    private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
    private transient ValueState<NFA<IN>> nfaOperatorState;
    private transient MapState<Long, List<IN>> elementQueueState;
    private final NFACompiler.NFAFactory<IN> nfaFactory;
    private transient InternalTimerService<VoidNamespace> timerService;
    private long lastWatermark;
    private final EventComparator<IN> comparator;
    protected final AfterMatchSkipStrategy afterMatchSkipStrategy;

    public AbstractKeyedCEPPatternOperator(TypeSerializer<IN> typeSerializer, boolean z, NFACompiler.NFAFactory<IN> nFAFactory, EventComparator<IN> eventComparator, AfterMatchSkipStrategy afterMatchSkipStrategy, F f) {
        super(f);
        this.inputSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.isProcessingTime = ((Boolean) Preconditions.checkNotNull(Boolean.valueOf(z))).booleanValue();
        this.nfaFactory = (NFACompiler.NFAFactory) Preconditions.checkNotNull(nFAFactory);
        this.comparator = eventComparator;
        if (afterMatchSkipStrategy == null) {
            this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
        } else {
            this.afterMatchSkipStrategy = afterMatchSkipStrategy;
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        if (this.nfaOperatorState == null) {
            this.nfaOperatorState = getRuntimeContext().getState(new ValueStateDescriptor(NFA_OPERATOR_STATE_NAME, new NFA.NFASerializer(this.inputSerializer)));
        }
        if (this.elementQueueState == null) {
            this.elementQueueState = getRuntimeContext().getMapState(new MapStateDescriptor(EVENT_QUEUE_STATE_NAME, LongSerializer.INSTANCE, new ListSerializer(this.inputSerializer)));
        }
    }

    public void open() throws Exception {
        super.open();
        this.timerService = getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        if (!this.isProcessingTime) {
            long timestamp = streamRecord.getTimestamp();
            Object value = streamRecord.getValue();
            if (timestamp > this.lastWatermark) {
                saveRegisterWatermarkTimer();
                bufferEvent(value, timestamp);
                return;
            }
            return;
        }
        if (this.comparator == null) {
            NFA nfa = getNFA();
            processEvent(nfa, streamRecord.getValue(), getProcessingTimeService().getCurrentProcessingTime());
            updateNFA(nfa);
        } else {
            long currentProcessingTime = this.timerService.currentProcessingTime();
            bufferEvent(streamRecord.getValue(), currentProcessingTime);
            this.timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentProcessingTime + 1);
        }
    }

    private void saveRegisterWatermarkTimer() {
        long currentWatermark = this.timerService.currentWatermark();
        if (currentWatermark + 1 > currentWatermark) {
            this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
        }
    }

    private void bufferEvent(IN in, long j) throws Exception {
        List list = (List) this.elementQueueState.get(Long.valueOf(j));
        if (list == null) {
            list = new ArrayList();
        }
        if (getExecutionConfig().isObjectReuseEnabled()) {
            list.add(this.inputSerializer.copy(in));
        } else {
            list.add(in);
        }
        this.elementQueueState.put(Long.valueOf(j), list);
    }

    public void onEventTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
        NFA<IN> nfa = getNFA();
        while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek().longValue() <= this.timerService.currentWatermark()) {
            long longValue = sortedTimestamps.poll().longValue();
            sort((Iterable) this.elementQueueState.get(Long.valueOf(longValue))).forEachOrdered(obj -> {
                processEvent(nfa, obj, longValue);
            });
            this.elementQueueState.remove(Long.valueOf(longValue));
        }
        advanceTime(nfa, this.timerService.currentWatermark());
        if (sortedTimestamps.isEmpty()) {
            this.elementQueueState.clear();
        }
        updateNFA(nfa);
        if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
            saveRegisterWatermarkTimer();
        }
        updateLastSeenWatermark(this.timerService.currentWatermark());
    }

    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
        NFA<IN> nfa = getNFA();
        while (!sortedTimestamps.isEmpty()) {
            long longValue = sortedTimestamps.poll().longValue();
            sort((Iterable) this.elementQueueState.get(Long.valueOf(longValue))).forEachOrdered(obj -> {
                processEvent(nfa, obj, longValue);
            });
            this.elementQueueState.remove(Long.valueOf(longValue));
        }
        if (sortedTimestamps.isEmpty()) {
            this.elementQueueState.clear();
        }
        updateNFA(nfa);
    }

    private Stream<IN> sort(Iterable<IN> iterable) {
        Stream<IN> stream = StreamSupport.stream(iterable.spliterator(), false);
        return this.comparator == null ? stream : stream.sorted(this.comparator);
    }

    private void updateLastSeenWatermark(long j) {
        this.lastWatermark = j;
    }

    private NFA<IN> getNFA() throws IOException {
        NFA<IN> nfa = (NFA) this.nfaOperatorState.value();
        return nfa != null ? nfa : this.nfaFactory.createNFA();
    }

    private void updateNFA(NFA<IN> nfa) throws IOException {
        if (nfa.isNFAChanged()) {
            if (nfa.isEmpty()) {
                this.nfaOperatorState.clear();
            } else {
                nfa.resetNFAChanged();
                this.nfaOperatorState.update(nfa);
            }
        }
    }

    private PriorityQueue<Long> getSortedTimestamps() throws Exception {
        PriorityQueue<Long> priorityQueue = new PriorityQueue<>();
        Iterator it = this.elementQueueState.keys().iterator();
        while (it.hasNext()) {
            priorityQueue.offer((Long) it.next());
        }
        return priorityQueue;
    }

    private void processEvent(NFA<IN> nfa, IN in, long j) {
        Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> process = nfa.process(in, j, this.afterMatchSkipStrategy);
        try {
            processMatchedSequences((Iterable) process.f0, j);
            processTimedOutSequences((Iterable) process.f1, j);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void advanceTime(NFA<IN> nfa, long j) throws Exception {
        processEvent(nfa, null, j);
    }

    protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> iterable, long j) throws Exception;

    protected void processTimedOutSequences(Iterable<Tuple2<Map<String, List<IN>>, Long>> iterable, long j) throws Exception {
    }

    @VisibleForTesting
    public boolean hasNonEmptyNFA(KEY key) throws IOException {
        setCurrentKey(key);
        return this.nfaOperatorState.value() != null;
    }

    @VisibleForTesting
    public boolean hasNonEmptyPQ(KEY key) throws Exception {
        setCurrentKey(key);
        return this.elementQueueState.keys().iterator().hasNext();
    }

    @VisibleForTesting
    public int getPQSize(KEY key) throws Exception {
        setCurrentKey(key);
        int i = 0;
        Iterator it = this.elementQueueState.values().iterator();
        while (it.hasNext()) {
            i += ((List) it.next()).size();
        }
        return i;
    }
}
