package org.apache.flink.cep.operator;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Migration;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.class */
public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace>, CheckpointedRestoringOperator {
    private static final long serialVersionUID = -4166778210774160757L;
    private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
    private final boolean isProcessingTime;
    private final TypeSerializer<IN> inputSerializer;
    private final TypeSerializer<KEY> keySerializer;
    private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName";
    private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
    private transient ValueState<NFA<IN>> nfaOperatorState;
    private transient MapState<Long, List<IN>> elementQueueState;
    private final NFACompiler.NFAFactory<IN> nfaFactory;
    private transient InternalTimerService<VoidNamespace> timerService;
    private long lastWatermark;
    private final boolean migratingFromOldKeyedOperator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator$PriorityQueueFactory.class */
    public interface PriorityQueueFactory<T> extends Serializable {
        PriorityQueue<T> createPriorityQueue();
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator$PriorityQueueSerializer.class */
    private static class PriorityQueueSerializer<T> extends TypeSerializer<PriorityQueue<T>> {
        private static final long serialVersionUID = -231980397616187715L;
        private final TypeSerializer<T> elementSerializer;
        private final PriorityQueueFactory<T> factory;

        PriorityQueueSerializer(TypeSerializer<T> typeSerializer, PriorityQueueFactory<T> priorityQueueFactory) {
            this.elementSerializer = typeSerializer;
            this.factory = priorityQueueFactory;
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<PriorityQueue<T>> duplicate() {
            return new PriorityQueueSerializer(this.elementSerializer.duplicate(), this.factory);
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public PriorityQueue<T> m13createInstance() {
            return this.factory.createPriorityQueue();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public PriorityQueue<T> copy(PriorityQueue<T> priorityQueue) {
            PriorityQueue<T> createPriorityQueue = this.factory.createPriorityQueue();
            Iterator<T> it = priorityQueue.iterator();
            while (it.hasNext()) {
                createPriorityQueue.offer(this.elementSerializer.copy(it.next()));
            }
            return createPriorityQueue;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public PriorityQueue<T> copy(PriorityQueue<T> priorityQueue, PriorityQueue<T> priorityQueue2) {
            priorityQueue2.clear();
            Iterator<T> it = priorityQueue.iterator();
            while (it.hasNext()) {
                priorityQueue2.offer(this.elementSerializer.copy(it.next()));
            }
            return priorityQueue2;
        }

        public int getLength() {
            return 0;
        }

        public void serialize(PriorityQueue<T> priorityQueue, DataOutputView dataOutputView) throws IOException {
            dataOutputView.writeInt(priorityQueue.size());
            Iterator<T> it = priorityQueue.iterator();
            while (it.hasNext()) {
                this.elementSerializer.serialize(it.next(), dataOutputView);
            }
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public PriorityQueue<T> m12deserialize(DataInputView dataInputView) throws IOException {
            return deserialize((PriorityQueue) this.factory.createPriorityQueue(), dataInputView);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public PriorityQueue<T> deserialize(PriorityQueue<T> priorityQueue, DataInputView dataInputView) throws IOException {
            priorityQueue.clear();
            int readInt = dataInputView.readInt();
            for (int i = 0; i < readInt; i++) {
                priorityQueue.offer(this.elementSerializer.deserialize(dataInputView));
            }
            return priorityQueue;
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof PriorityQueueSerializer)) {
                return false;
            }
            PriorityQueueSerializer priorityQueueSerializer = (PriorityQueueSerializer) obj;
            return this.factory.equals(priorityQueueSerializer.factory) && this.elementSerializer.equals(priorityQueueSerializer.elementSerializer);
        }

        public boolean canEqual(Object obj) {
            return obj instanceof PriorityQueueSerializer;
        }

        public int hashCode() {
            return Objects.hash(this.factory, this.elementSerializer);
        }

        public TypeSerializerConfigSnapshot snapshotConfiguration() {
            return new CollectionSerializerConfigSnapshot(this.elementSerializer);
        }

        public CompatibilityResult<PriorityQueue<T>> ensureCompatibility(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) {
            if (typeSerializerConfigSnapshot instanceof CollectionSerializerConfigSnapshot) {
                Tuple2 singleNestedSerializerAndConfig = ((CollectionSerializerConfigSnapshot) typeSerializerConfigSnapshot).getSingleNestedSerializerAndConfig();
                CompatibilityResult resolveCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult((TypeSerializer) singleNestedSerializerAndConfig.f0, UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot) singleNestedSerializerAndConfig.f1, this.elementSerializer);
                if (!resolveCompatibilityResult.isRequiresMigration()) {
                    return CompatibilityResult.compatible();
                }
                if (resolveCompatibilityResult.getConvertDeserializer() != null) {
                    return CompatibilityResult.requiresMigration(new PriorityQueueSerializer(new TypeDeserializerAdapter(resolveCompatibilityResult.getConvertDeserializer()), this.factory));
                }
            }
            return CompatibilityResult.requiresMigration();
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator$PriorityQueueStreamRecordFactory.class */
    private static class PriorityQueueStreamRecordFactory<T> implements PriorityQueueFactory<StreamRecord<T>> {
        private static final long serialVersionUID = 1254766984454616593L;

        private PriorityQueueStreamRecordFactory() {
        }

        @Override // org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.PriorityQueueFactory
        public PriorityQueue<StreamRecord<T>> createPriorityQueue() {
            return new PriorityQueue<>(11, new StreamRecordComparator());
        }

        public boolean equals(Object obj) {
            return obj instanceof PriorityQueueStreamRecordFactory;
        }

        public int hashCode() {
            return getClass().hashCode();
        }
    }

    public AbstractKeyedCEPPatternOperator(TypeSerializer<IN> typeSerializer, boolean z, TypeSerializer<KEY> typeSerializer2, NFACompiler.NFAFactory<IN> nFAFactory, boolean z2) {
        this.inputSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.isProcessingTime = ((Boolean) Preconditions.checkNotNull(Boolean.valueOf(z))).booleanValue();
        this.keySerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer2);
        this.nfaFactory = (NFACompiler.NFAFactory) Preconditions.checkNotNull(nFAFactory);
        this.migratingFromOldKeyedOperator = z2;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        if (this.nfaOperatorState == null) {
            this.nfaOperatorState = getRuntimeContext().getState(new ValueStateDescriptor(NFA_OPERATOR_STATE_NAME, new NFA.NFASerializer(this.inputSerializer)));
        }
        if (this.elementQueueState == null) {
            this.elementQueueState = getRuntimeContext().getMapState(new MapStateDescriptor(EVENT_QUEUE_STATE_NAME, LongSerializer.INSTANCE, new ListSerializer(this.inputSerializer)));
        }
    }

    public void open() throws Exception {
        super.open();
        this.timerService = getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        if (this.isProcessingTime) {
            NFA nfa = getNFA();
            processEvent(nfa, streamRecord.getValue(), getProcessingTimeService().getCurrentProcessingTime());
            updateNFA(nfa);
            return;
        }
        long timestamp = streamRecord.getTimestamp();
        Object value = streamRecord.getValue();
        if (timestamp >= this.lastWatermark) {
            saveRegisterWatermarkTimer();
            List list = (List) this.elementQueueState.get(Long.valueOf(timestamp));
            if (list == null) {
                list = new ArrayList();
            }
            if (getExecutionConfig().isObjectReuseEnabled()) {
                list.add(this.inputSerializer.copy(value));
            } else {
                list.add(streamRecord.getValue());
            }
            this.elementQueueState.put(Long.valueOf(timestamp), list);
        }
    }

    private void saveRegisterWatermarkTimer() {
        long currentWatermark = this.timerService.currentWatermark();
        if (currentWatermark + 1 > currentWatermark) {
            this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onEventTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
        NFA nfa = getNFA();
        while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek().longValue() <= this.timerService.currentWatermark()) {
            long longValue = sortedTimestamps.poll().longValue();
            Iterator it = ((List) this.elementQueueState.get(Long.valueOf(longValue))).iterator();
            while (it.hasNext()) {
                processEvent(nfa, it.next(), longValue);
            }
            this.elementQueueState.remove(Long.valueOf(longValue));
        }
        advanceTime(nfa, this.timerService.currentWatermark());
        if (sortedTimestamps.isEmpty()) {
            this.elementQueueState.clear();
        }
        updateNFA(nfa);
        if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
            saveRegisterWatermarkTimer();
        }
        updateLastSeenWatermark(this.timerService.currentWatermark());
    }

    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
    }

    private void updateLastSeenWatermark(long j) {
        this.lastWatermark = j;
    }

    private NFA<IN> getNFA() throws IOException {
        NFA<IN> nfa = (NFA) this.nfaOperatorState.value();
        return nfa != null ? nfa : this.nfaFactory.createNFA();
    }

    private void updateNFA(NFA<IN> nfa) throws IOException {
        if (nfa.isEmpty()) {
            this.nfaOperatorState.clear();
        } else {
            this.nfaOperatorState.update(nfa);
        }
    }

    private PriorityQueue<Long> getSortedTimestamps() throws Exception {
        PriorityQueue<Long> priorityQueue = new PriorityQueue<>();
        Iterator it = this.elementQueueState.keys().iterator();
        while (it.hasNext()) {
            priorityQueue.offer((Long) it.next());
        }
        return priorityQueue;
    }

    protected abstract void processEvent(NFA<IN> nfa, IN in, long j);

    protected abstract void advanceTime(NFA<IN> nfa, long j);

    public void restoreState(FSDataInputStream fSDataInputStream) throws Exception {
        if ((fSDataInputStream instanceof Migration) && fSDataInputStream.read() == 1) {
            throw new Exception("Found UDF state but CEPOperator is not an UDF operator.");
        }
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(fSDataInputStream);
        this.timerService = getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);
        ValueState state = getRuntimeContext().getState(new ValueStateDescriptor("nfaOperatorState", new NFA.Serializer()));
        ValueState state2 = getRuntimeContext().getState(new ValueStateDescriptor("priorityQueueStateName", new PriorityQueueSerializer(new StreamElementSerializer(this.inputSerializer), new PriorityQueueStreamRecordFactory())));
        if (!this.migratingFromOldKeyedOperator) {
            ObjectInputStream objectInputStream = new ObjectInputStream(fSDataInputStream);
            NFA nfa = (NFA) objectInputStream.readObject();
            MultiplexingStreamRecordSerializer multiplexingStreamRecordSerializer = new MultiplexingStreamRecordSerializer(this.inputSerializer);
            HashMap hashMap = new HashMap();
            int readInt = objectInputStream.readInt();
            for (int i = 0; i < readInt; i++) {
                StreamRecord asRecord = multiplexingStreamRecordSerializer.deserialize(dataInputViewStreamWrapper).asRecord();
                long timestamp = asRecord.getTimestamp();
                Object value = asRecord.getValue();
                List list = (List) hashMap.get(Long.valueOf(timestamp));
                if (list == null) {
                    list = new ArrayList();
                    hashMap.put(Long.valueOf(timestamp), list);
                }
                list.add(value);
            }
            setCurrentKey((byte) 0);
            this.nfaOperatorState.update(nfa);
            for (Map.Entry entry : hashMap.entrySet()) {
                this.elementQueueState.put(entry.getKey(), entry.getValue());
            }
            if (!this.isProcessingTime) {
                setCurrentKey((byte) 0);
                saveRegisterWatermarkTimer();
            }
            objectInputStream.close();
            return;
        }
        int readInt2 = dataInputViewStreamWrapper.readInt();
        for (int i2 = 0; i2 < readInt2; i2++) {
            setCurrentKey(this.keySerializer.deserialize(dataInputViewStreamWrapper));
            saveRegisterWatermarkTimer();
            NFA nfa2 = (NFA) state.value();
            state.clear();
            this.nfaOperatorState.update(nfa2);
            PriorityQueue priorityQueue = (PriorityQueue) state2.value();
            if (priorityQueue != null && !priorityQueue.isEmpty()) {
                HashMap hashMap2 = new HashMap();
                Iterator it = priorityQueue.iterator();
                while (it.hasNext()) {
                    StreamRecord streamRecord = (StreamRecord) it.next();
                    long timestamp2 = streamRecord.getTimestamp();
                    Object value2 = streamRecord.getValue();
                    List list2 = (List) hashMap2.get(Long.valueOf(timestamp2));
                    if (list2 == null) {
                        list2 = new ArrayList();
                        hashMap2.put(Long.valueOf(timestamp2), list2);
                    }
                    list2.add(value2);
                }
                for (Map.Entry entry2 : hashMap2.entrySet()) {
                    this.elementQueueState.put(entry2.getKey(), entry2.getValue());
                }
                state2.clear();
            }
        }
    }

    @VisibleForTesting
    public boolean hasNonEmptyNFA(KEY key) throws IOException {
        setCurrentKey(key);
        return this.nfaOperatorState.value() != null;
    }

    @VisibleForTesting
    public boolean hasNonEmptyPQ(KEY key) throws Exception {
        setCurrentKey(key);
        return this.elementQueueState.keys().iterator().hasNext();
    }

    @VisibleForTesting
    public int getPQSize(KEY key) throws Exception {
        setCurrentKey(key);
        int i = 0;
        Iterator it = this.elementQueueState.values().iterator();
        while (it.hasNext()) {
            i += ((List) it.next()).size();
        }
        return i;
    }
}
