package org.apache.flink.cep.operator;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/cep/operator/AbstractCEPPatternOperator.class */
public abstract class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
    private static final long serialVersionUID = 7487334510746595640L;
    private final StreamElementSerializer<IN> streamRecordSerializer;
    private NFA<IN> nfa;
    private transient PriorityQueue<StreamRecord<IN>> priorityQueue;

    public AbstractCEPPatternOperator(TypeSerializer<IN> typeSerializer, boolean z, NFACompiler.NFAFactory<IN> nFAFactory) {
        super(typeSerializer, z);
        this.streamRecordSerializer = new StreamElementSerializer<>(typeSerializer);
        this.nfa = nFAFactory.createNFA();
    }

    public void open() throws Exception {
        super.open();
        if (this.priorityQueue == null) {
            this.priorityQueue = new PriorityQueue<>(11, new StreamRecordComparator());
        }
    }

    @Override // org.apache.flink.cep.operator.AbstractCEPBasePatternOperator
    protected NFA<IN> getNFA() throws IOException {
        return this.nfa;
    }

    @Override // org.apache.flink.cep.operator.AbstractCEPBasePatternOperator
    protected void updateNFA(NFA<IN> nfa) {
    }

    @Override // org.apache.flink.cep.operator.AbstractCEPBasePatternOperator
    protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
        return this.priorityQueue;
    }

    @Override // org.apache.flink.cep.operator.AbstractCEPBasePatternOperator
    protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> priorityQueue) {
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processWatermark(Watermark watermark) throws Exception {
        if (this.priorityQueue.isEmpty()) {
            advanceTime(this.nfa, watermark.getTimestamp());
        } else {
            while (!this.priorityQueue.isEmpty() && this.priorityQueue.peek().getTimestamp() <= watermark.getTimestamp()) {
                StreamRecord<IN> poll = this.priorityQueue.poll();
                processEvent(this.nfa, poll.getValue(), poll.getTimestamp());
            }
        }
        this.output.emitWatermark(watermark);
    }

    public void snapshotState(FSDataOutputStream fSDataOutputStream, long j, long j2) throws Exception {
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(fSDataOutputStream);
        objectOutputStream.writeObject(this.nfa);
        objectOutputStream.writeInt(this.priorityQueue.size());
        Iterator<StreamRecord<IN>> it = this.priorityQueue.iterator();
        while (it.hasNext()) {
            this.streamRecordSerializer.serialize(it.next(), new DataOutputViewStreamWrapper(objectOutputStream));
        }
        objectOutputStream.flush();
    }

    public void restoreState(FSDataInputStream fSDataInputStream) throws Exception {
        ObjectInputStream objectInputStream = new ObjectInputStream(fSDataInputStream);
        this.nfa = (NFA) objectInputStream.readObject();
        int readInt = objectInputStream.readInt();
        this.priorityQueue = new PriorityQueue<>(readInt, new StreamRecordComparator());
        for (int i = 0; i < readInt; i++) {
            this.priorityQueue.offer(this.streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(objectInputStream)).asRecord());
        }
    }
}
