package org.apache.flink.cep.operator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;

/* loaded from: input_file:org/apache/flink/cep/operator/CEPOperatorTest.class */
public class CEPOperatorTest extends TestLogger {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPOperatorTest$ComplexNFAFactory.class */
    private static class ComplexNFAFactory implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private ComplexNFAFactory() {
            this(false);
        }

        private ComplexNFAFactory(boolean z) {
            this.handleTimeout = z;
        }

        public NFA<Event> createNFA() {
            return NFACompiler.compile(Pattern.begin("start").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.ComplexNFAFactory.4
                private static final long serialVersionUID = 5726188262756267490L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("c");
                }
            }).followedBy("middle1").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.ComplexNFAFactory.3
                private static final long serialVersionUID = 5726188262756267490L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("a");
                }
            }).oneOrMore().optional().followedBy("middle2").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.ComplexNFAFactory.2
                private static final long serialVersionUID = 5726188262756267490L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("b");
                }
            }).optional().followedBy("end").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.ComplexNFAFactory.1
                private static final long serialVersionUID = 5726188262756267490L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("a");
                }
            }).within(Time.milliseconds(10L)), Event.createTypeSerializer(), this.handleTimeout);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cep/operator/CEPOperatorTest$EventComparator.class */
    public class EventComparator implements Comparator<Event> {
        private EventComparator() {
        }

        @Override // java.util.Comparator
        public int compare(Event event, Event event2) {
            int compareTo = event.getName().compareTo(event2.getName());
            int compare = Double.compare(event.getPrice(), event2.getPrice());
            return compareTo == 0 ? compare == 0 ? Integer.compare(event.getId(), event2.getId()) : compare : compareTo;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cep/operator/CEPOperatorTest$ListEventComparator.class */
    public class ListEventComparator implements Comparator<List<Event>> {
        private ListEventComparator() {
        }

        @Override // java.util.Comparator
        public int compare(List<Event> list, List<Event> list2) {
            int compare = Integer.compare(list.size(), list2.size());
            if (compare != 0) {
                return compare;
            }
            EventComparator eventComparator = new EventComparator();
            for (int i = 0; i < list.size(); i++) {
                int compare2 = eventComparator.compare(list.get(i), list2.get(i));
                if (compare2 != 0) {
                    return compare2;
                }
            }
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cep/operator/CEPOperatorTest$NFAFactory.class */
    public static class NFAFactory implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private NFAFactory() {
            this(false);
        }

        private NFAFactory(boolean z) {
            this.handleTimeout = z;
        }

        public NFA<Event> createNFA() {
            return NFACompiler.compile(Pattern.begin("start").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.NFAFactory.3
                private static final long serialVersionUID = 5726188262756267490L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("start");
                }
            }).followedByAny("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.NFAFactory.2
                private static final long serialVersionUID = 6215754202506583964L;

                public boolean filter(SubEvent subEvent) throws Exception {
                    return subEvent.getVolume() > 5.0d;
                }
            }).followedByAny("end").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.NFAFactory.1
                private static final long serialVersionUID = 7056763917392056548L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("end");
                }
            }).within(Time.milliseconds(10L)), Event.createTypeSerializer(), this.handleTimeout);
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPOperatorTest$SimpleNFAFactory.class */
    private static class SimpleNFAFactory implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private SimpleNFAFactory() {
            this(false);
        }

        private SimpleNFAFactory(boolean z) {
            this.handleTimeout = z;
        }

        public NFA<Event> createNFA() {
            return NFACompiler.compile(Pattern.begin("start").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.SimpleNFAFactory.3
                private static final long serialVersionUID = 5726188262756267490L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("c");
                }
            }).followedBy("middle").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.SimpleNFAFactory.2
                private static final long serialVersionUID = 5726188262756267490L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("a");
                }
            }).followedBy("end").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.SimpleNFAFactory.1
                private static final long serialVersionUID = 5726188262756267490L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("b");
                }
            }).within(Time.milliseconds(10L)), Event.createTypeSerializer(), this.handleTimeout);
        }
    }

    @After
    public void validate() {
        Mockito.validateMockitoUsage();
    }

    @Test
    public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = getCepTestHarness(false);
        try {
            cepTestHarness.open();
            cepTestHarness.processWatermark(new Watermark(42L));
            verifyWatermark(cepTestHarness.getOutput().poll(), 42L);
        } finally {
            cepTestHarness.close();
        }
    }

    @Test
    public void testKeyedCEPOperatorCheckpointing() throws Exception {
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = getCepTestHarness(false);
        try {
            cepTestHarness.open();
            Event event = new Event(42, "start", 1.0d);
            SubEvent subEvent = new SubEvent(42, "foo", 1.0d, 10.0d);
            Event event2 = new Event(42, "end", 1.0d);
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            cepTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
            OperatorSubtaskState snapshot = cepTestHarness.snapshot(0L, 0L);
            cepTestHarness.close();
            OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness2 = getCepTestHarness(false);
            cepTestHarness2.setup();
            cepTestHarness2.initializeState(snapshot);
            cepTestHarness2.open();
            cepTestHarness2.processWatermark(new Watermark(Long.MIN_VALUE));
            cepTestHarness2.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 3L));
            cepTestHarness2.processWatermark(new Watermark(2L));
            cepTestHarness2.processElement(new StreamRecord(subEvent, 3L));
            cepTestHarness2.processElement(new StreamRecord(new Event(42, "start", 1.0d), 4L));
            cepTestHarness2.processElement(new StreamRecord(event2, 5L));
            OperatorSubtaskState snapshot2 = cepTestHarness2.snapshot(1L, 1L);
            cepTestHarness2.close();
            cepTestHarness = getCepTestHarness(false);
            cepTestHarness.setup();
            cepTestHarness.initializeState(snapshot2);
            cepTestHarness.open();
            cepTestHarness.processWatermark(new Watermark(Long.MAX_VALUE));
            ConcurrentLinkedQueue output = cepTestHarness.getOutput();
            Assert.assertEquals(2L, output.size());
            verifyPattern(output.poll(), event, subEvent, event2);
            verifyWatermark(output.poll(), Long.MAX_VALUE);
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception {
        String absolutePath = this.tempFolder.newFolder().getAbsolutePath();
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
        rocksDBStateBackend.setDbStoragePath(absolutePath);
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = getCepTestHarness(false);
        try {
            cepTestHarness.setStateBackend(rocksDBStateBackend);
            cepTestHarness.open();
            Event event = new Event(42, "start", 1.0d);
            SubEvent subEvent = new SubEvent(42, "foo", 1.0d, 10.0d);
            Event event2 = new Event(42, "end", 1.0d);
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            cepTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
            OperatorSubtaskState snapshot = cepTestHarness.snapshot(0L, 0L);
            cepTestHarness.close();
            OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness2 = getCepTestHarness(false);
            RocksDBStateBackend rocksDBStateBackend2 = new RocksDBStateBackend(new MemoryStateBackend());
            rocksDBStateBackend2.setDbStoragePath(absolutePath);
            cepTestHarness2.setStateBackend(rocksDBStateBackend2);
            cepTestHarness2.setup();
            cepTestHarness2.initializeState(snapshot);
            cepTestHarness2.open();
            cepTestHarness2.processWatermark(new Watermark(Long.MIN_VALUE));
            cepTestHarness2.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 3L));
            cepTestHarness2.processWatermark(new Watermark(2L));
            OperatorSubtaskState snapshot2 = cepTestHarness2.snapshot(1L, 1L);
            cepTestHarness2.close();
            cepTestHarness = getCepTestHarness(false);
            RocksDBStateBackend rocksDBStateBackend3 = new RocksDBStateBackend(new MemoryStateBackend());
            rocksDBStateBackend3.setDbStoragePath(absolutePath);
            cepTestHarness.setStateBackend(rocksDBStateBackend3);
            cepTestHarness.setup();
            cepTestHarness.initializeState(snapshot2);
            cepTestHarness.open();
            cepTestHarness.processElement(new StreamRecord(subEvent, 3L));
            cepTestHarness.processElement(new StreamRecord(new Event(42, "start", 1.0d), 4L));
            cepTestHarness.processElement(new StreamRecord(event2, 5L));
            cepTestHarness.processWatermark(new Watermark(Long.MAX_VALUE));
            ConcurrentLinkedQueue output = cepTestHarness.getOutput();
            Assert.assertEquals(2L, output.size());
            verifyPattern(output.poll(), event, subEvent, event2);
            verifyWatermark(output.poll(), Long.MAX_VALUE);
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testKeyedAdvancingTimeWithoutElements() throws Exception {
        Event event = new Event(42, "start", 1.0d);
        HashMap hashMap = new HashMap(2);
        hashMap.put("start", Collections.singletonList(event));
        OutputTag<Tuple2<Map<String, List<Event>>, Long>> outputTag = new OutputTag<Tuple2<Map<String, List<Event>>, Long>>("timedOut") { // from class: org.apache.flink.cep.operator.CEPOperatorTest.1
        };
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new SelectTimeoutCepOperator(Event.createTypeSerializer(), false, new NFAFactory(true), (org.apache.flink.cep.EventComparator) null, (AfterMatchSkipStrategy) null, new PatternSelectFunction<Event, Map<String, List<Event>>>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.3
            private static final long serialVersionUID = -5768297287711394420L;

            public Map<String, List<Event>> select(Map<String, List<Event>> map) throws Exception {
                return map;
            }

            /* renamed from: select, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m18select(Map map) throws Exception {
                return select((Map<String, List<Event>>) map);
            }
        }, new PatternTimeoutFunction<Event, Tuple2<Map<String, List<Event>>, Long>>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.4
            private static final long serialVersionUID = 2843329425823093249L;

            public Tuple2<Map<String, List<Event>>, Long> timeout(Map<String, List<Event>> map, long j) throws Exception {
                return Tuple2.of(map, Long.valueOf(j));
            }

            /* renamed from: timeout, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m19timeout(Map map, long j) throws Exception {
                return timeout((Map<String, List<Event>>) map, j);
            }
        }, outputTag), new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.2
            private static final long serialVersionUID = 7219185117566268366L;

            public Integer getKey(Event event2) throws Exception {
                return Integer.valueOf(event2.getId());
            }
        }, BasicTypeInfo.INT_TYPE_INFO);
        try {
            String absolutePath = this.tempFolder.newFolder().getAbsolutePath();
            RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
            rocksDBStateBackend.setDbStoragePath(absolutePath);
            keyedOneInputStreamOperatorTestHarness.setStateBackend(rocksDBStateBackend);
            keyedOneInputStreamOperatorTestHarness.setup(new KryoSerializer(Map.class, new ExecutionConfig()));
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(event, 3L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(5L));
            keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(13L));
            ConcurrentLinkedQueue output = keyedOneInputStreamOperatorTestHarness.getOutput();
            ConcurrentLinkedQueue sideOutput = keyedOneInputStreamOperatorTestHarness.getSideOutput(outputTag);
            Assert.assertEquals(2L, output.size());
            Assert.assertEquals(1L, sideOutput.size());
            Object poll = output.poll();
            Assert.assertTrue(poll instanceof Watermark);
            Assert.assertEquals(5L, ((Watermark) poll).getTimestamp());
            Tuple2 tuple2 = (Tuple2) ((StreamRecord) sideOutput.poll()).getValue();
            Assert.assertEquals(13L, ((Long) tuple2.f1).longValue());
            Assert.assertEquals(hashMap, tuple2.f0);
            Object poll2 = output.poll();
            Assert.assertTrue(poll2 instanceof Watermark);
            Assert.assertEquals(13L, ((Watermark) poll2).getTimestamp());
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            keyedOneInputStreamOperatorTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testKeyedCEPOperatorNFAUpdate() throws Exception {
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()));
        try {
            cepTestHarness.open();
            Event event = new Event(42, "c", 1.0d);
            SubEvent subEvent = new SubEvent(42, "a", 1.0d, 10.0d);
            Event event2 = new Event(42, "b", 1.0d);
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            OperatorSubtaskState snapshot = cepTestHarness.snapshot(0L, 0L);
            cepTestHarness.close();
            OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness2 = CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()));
            cepTestHarness2.setup();
            cepTestHarness2.initializeState(snapshot);
            cepTestHarness2.open();
            cepTestHarness2.processElement(new StreamRecord(new Event(42, "d", 1.0d), 4L));
            OperatorSubtaskState snapshot2 = cepTestHarness2.snapshot(0L, 0L);
            cepTestHarness2.close();
            cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()));
            cepTestHarness.setup();
            cepTestHarness.initializeState(snapshot2);
            cepTestHarness.open();
            cepTestHarness.processElement(new StreamRecord(subEvent, 4L));
            cepTestHarness.processElement(new StreamRecord(event2, 4L));
            ConcurrentLinkedQueue output = cepTestHarness.getOutput();
            Assert.assertEquals(1L, output.size());
            verifyPattern(output.poll(), event, subEvent, event2);
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testKeyedCEPOperatorNFAUpdateWithRocksDB() throws Exception {
        String absolutePath = this.tempFolder.newFolder().getAbsolutePath();
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
        rocksDBStateBackend.setDbStoragePath(absolutePath);
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()));
        try {
            cepTestHarness.setStateBackend(rocksDBStateBackend);
            cepTestHarness.open();
            Event event = new Event(42, "c", 1.0d);
            SubEvent subEvent = new SubEvent(42, "a", 1.0d, 10.0d);
            Event event2 = new Event(42, "b", 1.0d);
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            OperatorSubtaskState snapshot = cepTestHarness.snapshot(0L, 0L);
            cepTestHarness.close();
            OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness2 = CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()));
            RocksDBStateBackend rocksDBStateBackend2 = new RocksDBStateBackend(new MemoryStateBackend());
            rocksDBStateBackend2.setDbStoragePath(absolutePath);
            cepTestHarness2.setStateBackend(rocksDBStateBackend2);
            cepTestHarness2.setup();
            cepTestHarness2.initializeState(snapshot);
            cepTestHarness2.open();
            cepTestHarness2.processElement(new StreamRecord(new Event(42, "d", 1.0d), 4L));
            OperatorSubtaskState snapshot2 = cepTestHarness2.snapshot(0L, 0L);
            cepTestHarness2.close();
            cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()));
            RocksDBStateBackend rocksDBStateBackend3 = new RocksDBStateBackend(new MemoryStateBackend());
            rocksDBStateBackend3.setDbStoragePath(absolutePath);
            cepTestHarness.setStateBackend(rocksDBStateBackend3);
            cepTestHarness.setup();
            cepTestHarness.initializeState(snapshot2);
            cepTestHarness.open();
            cepTestHarness.processElement(new StreamRecord(subEvent, 4L));
            cepTestHarness.processElement(new StreamRecord(event2, 4L));
            ConcurrentLinkedQueue output = cepTestHarness.getOutput();
            Assert.assertEquals(1L, output.size());
            verifyPattern(output.poll(), event, subEvent, event2);
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testKeyedCEPOperatorNFAUpdateTimes() throws Exception {
        SelectCepOperator keyedCepOpearator = CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory());
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(keyedCepOpearator);
        try {
            cepTestHarness.open();
            ValueState valueState = (ValueState) Mockito.spy((ValueState) Whitebox.getInternalState(keyedCepOpearator, "nfaOperatorState"));
            Whitebox.setInternalState(keyedCepOpearator, "nfaOperatorState", valueState);
            Event event = new Event(42, "c", 1.0d);
            SubEvent subEvent = new SubEvent(42, "a", 1.0d, 10.0d);
            Event event2 = new Event(42, "b", 1.0d);
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            cepTestHarness.processElement(new StreamRecord(new Event(42, "d", 1.0d), 4L));
            cepTestHarness.processElement(new StreamRecord(subEvent, 4L));
            cepTestHarness.processElement(new StreamRecord(event2, 4L));
            ((ValueState) Mockito.verify(valueState, Mockito.times(2))).update(Mockito.any());
            ConcurrentLinkedQueue output = cepTestHarness.getOutput();
            Assert.assertEquals(1L, output.size());
            verifyPattern(output.poll(), event, subEvent, event2);
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testKeyedCEPOperatorNFAUpdateTimesWithRocksDB() throws Exception {
        String absolutePath = this.tempFolder.newFolder().getAbsolutePath();
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
        rocksDBStateBackend.setDbStoragePath(absolutePath);
        SelectCepOperator keyedCepOpearator = CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory());
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(keyedCepOpearator);
        try {
            cepTestHarness.setStateBackend(rocksDBStateBackend);
            cepTestHarness.open();
            ValueState valueState = (ValueState) Mockito.spy((ValueState) Whitebox.getInternalState(keyedCepOpearator, "nfaOperatorState"));
            Whitebox.setInternalState(keyedCepOpearator, "nfaOperatorState", valueState);
            Event event = new Event(42, "c", 1.0d);
            SubEvent subEvent = new SubEvent(42, "a", 1.0d, 10.0d);
            Event event2 = new Event(42, "b", 1.0d);
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            cepTestHarness.processElement(new StreamRecord(new Event(42, "d", 1.0d), 4L));
            cepTestHarness.processElement(new StreamRecord(subEvent, 4L));
            cepTestHarness.processElement(new StreamRecord(event2, 4L));
            ((ValueState) Mockito.verify(valueState, Mockito.times(2))).update(Mockito.any());
            ConcurrentLinkedQueue output = cepTestHarness.getOutput();
            Assert.assertEquals(1L, output.size());
            verifyPattern(output.poll(), event, subEvent, event2);
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testCEPOperatorCleanupEventTime() throws Exception {
        Event event = new Event(42, "start", 1.0d);
        Event event2 = new Event(42, "start", 2.0d);
        SubEvent subEvent = new SubEvent(42, "foo1", 1.0d, 10.0d);
        SubEvent subEvent2 = new SubEvent(42, "foo2", 1.0d, 10.0d);
        SubEvent subEvent3 = new SubEvent(42, "foo3", 1.0d, 10.0d);
        Event event3 = new Event(42, "end", 1.0d);
        Event event4 = new Event(42, "end", 2.0d);
        Event event5 = new Event(43, "start", 1.0d);
        SelectCepOperator<Event, Integer, Map<String, List<Event>>> keyedCepOperator = getKeyedCepOperator(false);
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(keyedCepOperator);
        try {
            cepTestHarness.open();
            cepTestHarness.processWatermark(new Watermark(Long.MIN_VALUE));
            cepTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
            cepTestHarness.processElement(new StreamRecord(subEvent, 2L));
            cepTestHarness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 3L));
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            cepTestHarness.processElement(new StreamRecord(event5, 1L));
            Assert.assertEquals(2L, cepTestHarness.numEventTimeTimers());
            Assert.assertEquals(4L, keyedCepOperator.getPQSize(42));
            Assert.assertEquals(1L, keyedCepOperator.getPQSize(43));
            Assert.assertTrue(!keyedCepOperator.hasNonEmptyNFA(42));
            Assert.assertTrue(!keyedCepOperator.hasNonEmptyNFA(43));
            cepTestHarness.processWatermark(new Watermark(2L));
            verifyWatermark(cepTestHarness.getOutput().poll(), Long.MIN_VALUE);
            verifyWatermark(cepTestHarness.getOutput().poll(), 2L);
            Assert.assertEquals(2L, cepTestHarness.numEventTimeTimers());
            Assert.assertTrue(keyedCepOperator.hasNonEmptyNFA(42));
            Assert.assertEquals(1L, keyedCepOperator.getPQSize(42));
            Assert.assertTrue(keyedCepOperator.hasNonEmptyNFA(43));
            Assert.assertTrue(!keyedCepOperator.hasNonEmptyPQ(43));
            cepTestHarness.processElement(new StreamRecord(event2, 4L));
            cepTestHarness.processElement(new StreamRecord(subEvent2, 5L));
            OperatorSubtaskState snapshot = cepTestHarness.snapshot(0L, 0L);
            cepTestHarness.close();
            SelectCepOperator<Event, Integer, Map<String, List<Event>>> keyedCepOperator2 = getKeyedCepOperator(false);
            cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(keyedCepOperator2);
            cepTestHarness.setup();
            cepTestHarness.initializeState(snapshot);
            cepTestHarness.open();
            cepTestHarness.processElement(new StreamRecord(event3, 6L));
            cepTestHarness.processWatermark(11L);
            cepTestHarness.processWatermark(12L);
            Assert.assertEquals(1L, cepTestHarness.numEventTimeTimers());
            Assert.assertTrue(keyedCepOperator2.hasNonEmptyNFA(42));
            Assert.assertTrue(!keyedCepOperator2.hasNonEmptyPQ(42));
            Assert.assertTrue(!keyedCepOperator2.hasNonEmptyNFA(43));
            Assert.assertTrue(!keyedCepOperator2.hasNonEmptyPQ(43));
            verifyPattern(cepTestHarness.getOutput().poll(), event, subEvent, event3);
            verifyPattern(cepTestHarness.getOutput().poll(), event, subEvent2, event3);
            verifyPattern(cepTestHarness.getOutput().poll(), event2, subEvent2, event3);
            verifyWatermark(cepTestHarness.getOutput().poll(), 11L);
            verifyWatermark(cepTestHarness.getOutput().poll(), 12L);
            cepTestHarness.processElement(new StreamRecord(subEvent3, 12L));
            cepTestHarness.processElement(new StreamRecord(event4, 13L));
            cepTestHarness.processWatermark(20L);
            cepTestHarness.processWatermark(21L);
            Assert.assertTrue(!keyedCepOperator2.hasNonEmptyNFA(42));
            Assert.assertTrue(!keyedCepOperator2.hasNonEmptyPQ(42));
            Assert.assertEquals(0L, cepTestHarness.numEventTimeTimers());
            Assert.assertEquals(3L, cepTestHarness.getOutput().size());
            verifyPattern(cepTestHarness.getOutput().poll(), event2, subEvent2, event4);
            verifyWatermark(cepTestHarness.getOutput().poll(), 20L);
            verifyWatermark(cepTestHarness.getOutput().poll(), 21L);
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testCEPOperatorCleanupEventTimeWithSameElements() throws Exception {
        Event event = new Event(41, "c", 1.0d);
        Event event2 = new Event(41, "a", 2.0d);
        Event event3 = new Event(41, "a", 3.0d);
        Event event4 = new Event(41, "a", 4.0d);
        Event event5 = new Event(41, "b", 5.0d);
        SelectCepOperator keyedCepOpearator = CepOperatorTestUtilities.getKeyedCepOpearator(false, new ComplexNFAFactory());
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(keyedCepOpearator);
        try {
            cepTestHarness.open();
            cepTestHarness.processWatermark(new Watermark(Long.MIN_VALUE));
            cepTestHarness.processElement(new StreamRecord(event5, 6L));
            cepTestHarness.processElement(new StreamRecord(event4, 7L));
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            cepTestHarness.processElement(new StreamRecord(event2, 3L));
            cepTestHarness.processElement(new StreamRecord(event3, 3L));
            cepTestHarness.processElement(new StreamRecord(event2, 3L));
            cepTestHarness.processElement(new StreamRecord(new Event(41, "d", 6.0d), 5L));
            Assert.assertEquals(1L, cepTestHarness.numEventTimeTimers());
            Assert.assertEquals(7L, keyedCepOpearator.getPQSize(41));
            Assert.assertTrue(!keyedCepOpearator.hasNonEmptyNFA(41));
            cepTestHarness.processWatermark(new Watermark(2L));
            verifyWatermark(cepTestHarness.getOutput().poll(), Long.MIN_VALUE);
            verifyWatermark(cepTestHarness.getOutput().poll(), 2L);
            Assert.assertEquals(1L, cepTestHarness.numEventTimeTimers());
            Assert.assertEquals(6L, keyedCepOpearator.getPQSize(41));
            Assert.assertTrue(keyedCepOpearator.hasNonEmptyNFA(41));
            cepTestHarness.processWatermark(new Watermark(8L));
            ArrayList arrayList = new ArrayList();
            while (!cepTestHarness.getOutput().isEmpty()) {
                Object poll = cepTestHarness.getOutput().poll();
                if (poll instanceof Watermark) {
                    verifyWatermark(poll, 8L);
                } else {
                    StreamRecord streamRecord = (StreamRecord) poll;
                    ArrayList arrayList2 = new ArrayList();
                    Iterator it = ((Map) streamRecord.getValue()).values().iterator();
                    while (it.hasNext()) {
                        arrayList2.addAll((List) it.next());
                    }
                    arrayList.add(arrayList2);
                }
            }
            compareMaps(arrayList, Lists.newArrayList(new List[]{Lists.newArrayList(new Event[]{event, event2}), Lists.newArrayList(new Event[]{event, event2, event3}), Lists.newArrayList(new Event[]{event, event5, event4}), Lists.newArrayList(new Event[]{event, event2, event3, event2}), Lists.newArrayList(new Event[]{event, event2, event5, event4}), Lists.newArrayList(new Event[]{event, event2, event2, event3, event4}), Lists.newArrayList(new Event[]{event, event2, event3, event5, event4}), Lists.newArrayList(new Event[]{event, event2, event2, event3, event5, event4})}));
            Assert.assertEquals(1L, cepTestHarness.numEventTimeTimers());
            Assert.assertEquals(0L, keyedCepOpearator.getPQSize(41));
            Assert.assertTrue(keyedCepOpearator.hasNonEmptyNFA(41));
            cepTestHarness.processWatermark(new Watermark(17L));
            verifyWatermark(cepTestHarness.getOutput().poll(), 17L);
            Assert.assertTrue(!keyedCepOpearator.hasNonEmptyNFA(41));
            Assert.assertTrue(!keyedCepOpearator.hasNonEmptyPQ(41));
            Assert.assertEquals(0L, cepTestHarness.numEventTimeTimers());
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testCEPOperatorCleanupProcessingTime() throws Exception {
        Event event = new Event(42, "start", 1.0d);
        Event event2 = new Event(42, "start", 2.0d);
        SubEvent subEvent = new SubEvent(42, "foo1", 1.0d, 10.0d);
        SubEvent subEvent2 = new SubEvent(42, "foo2", 1.0d, 10.0d);
        SubEvent subEvent3 = new SubEvent(42, "foo3", 1.0d, 10.0d);
        Event event3 = new Event(42, "end", 1.0d);
        Event event4 = new Event(42, "end", 2.0d);
        Event event5 = new Event(43, "start", 1.0d);
        SelectCepOperator<Event, Integer, Map<String, List<Event>>> keyedCepOperator = getKeyedCepOperator(true);
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(keyedCepOperator);
        try {
            cepTestHarness.open();
            cepTestHarness.setProcessingTime(0L);
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            cepTestHarness.processElement(new StreamRecord(event5, 1L));
            cepTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
            cepTestHarness.processElement(new StreamRecord(subEvent, 2L));
            cepTestHarness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 3L));
            Assert.assertTrue(!keyedCepOperator.hasNonEmptyPQ(42));
            Assert.assertTrue(!keyedCepOperator.hasNonEmptyPQ(43));
            Assert.assertTrue(keyedCepOperator.hasNonEmptyNFA(42));
            Assert.assertTrue(keyedCepOperator.hasNonEmptyNFA(43));
            cepTestHarness.setProcessingTime(3L);
            cepTestHarness.processElement(new StreamRecord(event2, 3L));
            cepTestHarness.processElement(new StreamRecord(subEvent2, 4L));
            OperatorSubtaskState snapshot = cepTestHarness.snapshot(0L, 0L);
            cepTestHarness.close();
            SelectCepOperator<Event, Integer, Map<String, List<Event>>> keyedCepOperator2 = getKeyedCepOperator(true);
            cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(keyedCepOperator2);
            cepTestHarness.setup();
            cepTestHarness.initializeState(snapshot);
            cepTestHarness.open();
            cepTestHarness.setProcessingTime(3L);
            cepTestHarness.processElement(new StreamRecord(event3, 5L));
            verifyPattern(cepTestHarness.getOutput().poll(), event, subEvent, event3);
            verifyPattern(cepTestHarness.getOutput().poll(), event, subEvent2, event3);
            verifyPattern(cepTestHarness.getOutput().poll(), event2, subEvent2, event3);
            cepTestHarness.setProcessingTime(11L);
            cepTestHarness.processElement(new StreamRecord(subEvent3, 11L));
            cepTestHarness.processElement(new StreamRecord(event4, 12L));
            verifyPattern(cepTestHarness.getOutput().poll(), event2, subEvent2, event4);
            verifyPattern(cepTestHarness.getOutput().poll(), event2, subEvent3, event4);
            cepTestHarness.setProcessingTime(21L);
            Assert.assertTrue(keyedCepOperator2.hasNonEmptyNFA(42));
            cepTestHarness.processElement(new StreamRecord(event, 21L));
            Assert.assertTrue(keyedCepOperator2.hasNonEmptyNFA(42));
            cepTestHarness.setProcessingTime(49L);
            cepTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
            Assert.assertTrue(!keyedCepOperator2.hasNonEmptyNFA(42));
            Assert.assertEquals(0L, cepTestHarness.numEventTimeTimers());
            Assert.assertTrue(!keyedCepOperator2.hasNonEmptyPQ(42));
            Assert.assertTrue(!keyedCepOperator2.hasNonEmptyPQ(43));
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testCEPOperatorSerializationWRocksDB() throws Exception {
        String absolutePath = this.tempFolder.newFolder().getAbsolutePath();
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
        rocksDBStateBackend.setDbStoragePath(absolutePath);
        Event event = new Event(40, "start", 1.0d);
        Event event2 = new Event(40, "start", 2.0d);
        SubEvent subEvent = new SubEvent(40, "foo1", 1.0d, 10.0d);
        SubEvent subEvent2 = new SubEvent(40, "foo2", 2.0d, 10.0d);
        SubEvent subEvent3 = new SubEvent(40, "foo3", 3.0d, 10.0d);
        SubEvent subEvent4 = new SubEvent(40, "foo4", 1.0d, 10.0d);
        Event event3 = new Event(40, "next-one", 1.0d);
        Event event4 = new Event(40, "end", 1.0d);
        final Pattern where = Pattern.begin("start").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.7
            private static final long serialVersionUID = 5726188262756267490L;

            public boolean filter(Event event5) throws Exception {
                return event5.getName().equals("start");
            }
        }).followedBy("middle").subtype(SubEvent.class).where(new IterativeCondition<SubEvent>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.6
            private static final long serialVersionUID = 6215754202506583964L;

            public boolean filter(SubEvent subEvent5, IterativeCondition.Context<SubEvent> context) throws Exception {
                if (!subEvent5.getName().startsWith("foo")) {
                    return false;
                }
                double d = 0.0d;
                Iterator it = context.getEventsForPattern("middle").iterator();
                while (it.hasNext()) {
                    d += ((Event) it.next()).getPrice();
                }
                return Double.compare(d + subEvent5.getPrice(), 5.0d) < 0;
            }

            public /* bridge */ /* synthetic */ boolean filter(Object obj, IterativeCondition.Context context) throws Exception {
                return filter((SubEvent) obj, (IterativeCondition.Context<SubEvent>) context);
            }
        }).oneOrMore().allowCombinations().followedBy("end").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.5
            private static final long serialVersionUID = 7056763917392056548L;

            public boolean filter(Event event5) throws Exception {
                return event5.getName().equals("end");
            }
        });
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFACompiler.NFAFactory<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.8
            private static final long serialVersionUID = 477082663248051994L;

            public NFA<Event> createNFA() {
                return NFACompiler.compile(where, Event.createTypeSerializer(), false);
            }
        }));
        try {
            cepTestHarness.setStateBackend(rocksDBStateBackend);
            cepTestHarness.open();
            cepTestHarness.processWatermark(0L);
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            cepTestHarness.processElement(new StreamRecord(subEvent, 2L));
            cepTestHarness.processWatermark(2L);
            cepTestHarness.processElement(new StreamRecord(subEvent3, 5L));
            cepTestHarness.processElement(new StreamRecord(subEvent2, 3L));
            cepTestHarness.processElement(new StreamRecord(event2, 4L));
            cepTestHarness.processWatermark(5L);
            cepTestHarness.processElement(new StreamRecord(event3, 7L));
            cepTestHarness.processElement(new StreamRecord(event4, 8L));
            cepTestHarness.processElement(new StreamRecord(subEvent4, 6L));
            cepTestHarness.processWatermark(100L);
            ArrayList arrayList = new ArrayList();
            while (!cepTestHarness.getOutput().isEmpty()) {
                Object poll = cepTestHarness.getOutput().poll();
                if (!(poll instanceof Watermark)) {
                    StreamRecord streamRecord = (StreamRecord) poll;
                    ArrayList arrayList2 = new ArrayList();
                    Iterator it = ((Map) streamRecord.getValue()).values().iterator();
                    while (it.hasNext()) {
                        arrayList2.addAll((List) it.next());
                    }
                    arrayList.add(arrayList2);
                }
            }
            compareMaps(arrayList, Lists.newArrayList(new List[]{Lists.newArrayList(new Event[]{event, event4, subEvent, subEvent2, subEvent4}), Lists.newArrayList(new Event[]{event, event4, subEvent2, subEvent}), Lists.newArrayList(new Event[]{event, event4, subEvent3, subEvent}), Lists.newArrayList(new Event[]{event2, event4, subEvent3, subEvent4}), Lists.newArrayList(new Event[]{event, event4, subEvent4, subEvent}), Lists.newArrayList(new Event[]{event, event4, subEvent}), Lists.newArrayList(new Event[]{event2, event4, subEvent3})}));
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testCEPOperatorComparatorProcessTime() throws Exception {
        Event event = new Event(42, "start", 1.0d);
        Event event2 = new Event(42, "start", 2.0d);
        SubEvent subEvent = new SubEvent(42, "foo1", 3.0d, 10.0d);
        SubEvent subEvent2 = new SubEvent(42, "foo2", 4.0d, 10.0d);
        Event event3 = new Event(42, "end", 1.0d);
        Event event4 = new Event(43, "start", 1.0d);
        SelectCepOperator<Event, Integer, Map<String, List<Event>>> keyedCepOperatorWithComparator = getKeyedCepOperatorWithComparator(true);
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(keyedCepOperatorWithComparator);
        try {
            cepTestHarness.open();
            cepTestHarness.setProcessingTime(0L);
            cepTestHarness.processElement(new StreamRecord(event, 0L));
            cepTestHarness.processElement(new StreamRecord(event4, 0L));
            cepTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 0L));
            cepTestHarness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 0L));
            Assert.assertTrue(!keyedCepOperatorWithComparator.hasNonEmptyNFA(42));
            Assert.assertTrue(!keyedCepOperatorWithComparator.hasNonEmptyNFA(43));
            cepTestHarness.setProcessingTime(3L);
            Assert.assertTrue(keyedCepOperatorWithComparator.hasNonEmptyNFA(42));
            Assert.assertTrue(keyedCepOperatorWithComparator.hasNonEmptyNFA(43));
            cepTestHarness.processElement(new StreamRecord(subEvent2, 3L));
            cepTestHarness.processElement(new StreamRecord(subEvent, 3L));
            cepTestHarness.processElement(new StreamRecord(event2, 3L));
            OperatorSubtaskState snapshot = cepTestHarness.snapshot(0L, 0L);
            cepTestHarness.close();
            cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(getKeyedCepOperatorWithComparator(true));
            cepTestHarness.setup();
            cepTestHarness.initializeState(snapshot);
            cepTestHarness.open();
            cepTestHarness.setProcessingTime(4L);
            cepTestHarness.processElement(new StreamRecord(event3, 5L));
            cepTestHarness.setProcessingTime(5L);
            verifyPattern(cepTestHarness.getOutput().poll(), event, subEvent, event3);
            verifyPattern(cepTestHarness.getOutput().poll(), event, subEvent2, event3);
            verifyPattern(cepTestHarness.getOutput().poll(), event2, subEvent, event3);
            verifyPattern(cepTestHarness.getOutput().poll(), event2, subEvent2, event3);
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    @Test
    public void testCEPOperatorComparatorEventTime() throws Exception {
        Event event = new Event(42, "start", 1.0d);
        Event event2 = new Event(42, "start", 2.0d);
        SubEvent subEvent = new SubEvent(42, "foo1", 1.0d, 10.0d);
        SubEvent subEvent2 = new SubEvent(42, "foo2", 2.0d, 10.0d);
        Event event3 = new Event(42, "end", 1.0d);
        Event event4 = new Event(43, "start", 1.0d);
        SelectCepOperator<Event, Integer, Map<String, List<Event>>> keyedCepOperatorWithComparator = getKeyedCepOperatorWithComparator(false);
        OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(keyedCepOperatorWithComparator);
        try {
            cepTestHarness.open();
            cepTestHarness.processWatermark(0L);
            cepTestHarness.processElement(new StreamRecord(event, 1L));
            cepTestHarness.processElement(new StreamRecord(event4, 1L));
            cepTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
            cepTestHarness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 3L));
            Assert.assertTrue(keyedCepOperatorWithComparator.hasNonEmptyPQ(42));
            Assert.assertTrue(keyedCepOperatorWithComparator.hasNonEmptyPQ(43));
            Assert.assertTrue(!keyedCepOperatorWithComparator.hasNonEmptyNFA(42));
            Assert.assertTrue(!keyedCepOperatorWithComparator.hasNonEmptyNFA(43));
            cepTestHarness.processWatermark(3L);
            Assert.assertTrue(!keyedCepOperatorWithComparator.hasNonEmptyPQ(42));
            Assert.assertTrue(!keyedCepOperatorWithComparator.hasNonEmptyPQ(43));
            Assert.assertTrue(keyedCepOperatorWithComparator.hasNonEmptyNFA(42));
            Assert.assertTrue(keyedCepOperatorWithComparator.hasNonEmptyNFA(43));
            cepTestHarness.processElement(new StreamRecord(event2, 4L));
            cepTestHarness.processElement(new StreamRecord(subEvent2, 5L));
            cepTestHarness.processElement(new StreamRecord(subEvent, 5L));
            OperatorSubtaskState snapshot = cepTestHarness.snapshot(0L, 0L);
            cepTestHarness.close();
            cepTestHarness = CepOperatorTestUtilities.getCepTestHarness(getKeyedCepOperatorWithComparator(false));
            cepTestHarness.setup();
            cepTestHarness.initializeState(snapshot);
            cepTestHarness.open();
            cepTestHarness.processElement(new StreamRecord(event3, 6L));
            cepTestHarness.processWatermark(6L);
            verifyPattern(cepTestHarness.getOutput().poll(), event, subEvent, event3);
            verifyPattern(cepTestHarness.getOutput().poll(), event, subEvent2, event3);
            verifyPattern(cepTestHarness.getOutput().poll(), event2, subEvent, event3);
            verifyPattern(cepTestHarness.getOutput().poll(), event2, subEvent2, event3);
            verifyWatermark(cepTestHarness.getOutput().poll(), 6L);
            cepTestHarness.close();
        } catch (Throwable th) {
            cepTestHarness.close();
            throw th;
        }
    }

    private void verifyWatermark(Object obj, long j) {
        Assert.assertTrue(obj instanceof Watermark);
        Assert.assertEquals(j, ((Watermark) obj).getTimestamp());
    }

    private void verifyPattern(Object obj, Event event, SubEvent subEvent, Event event2) {
        Assert.assertTrue(obj instanceof StreamRecord);
        StreamRecord streamRecord = (StreamRecord) obj;
        Assert.assertTrue(streamRecord.getValue() instanceof Map);
        Map map = (Map) streamRecord.getValue();
        Assert.assertEquals(event, ((List) map.get("start")).get(0));
        Assert.assertEquals(subEvent, ((List) map.get("middle")).get(0));
        Assert.assertEquals(event2, ((List) map.get("end")).get(0));
    }

    private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperator(boolean z) {
        return CepOperatorTestUtilities.getKeyedCepOpearator(z, new NFAFactory());
    }

    private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperatorWithComparator(boolean z) {
        return CepOperatorTestUtilities.getKeyedCepOpearator(z, new NFAFactory(), new org.apache.flink.cep.EventComparator<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.9
            public int compare(Event event, Event event2) {
                return Double.compare(event.getPrice(), event2.getPrice());
            }
        });
    }

    private void compareMaps(List<List<Event>> list, List<List<Event>> list2) {
        Assert.assertEquals(list2.size(), list.size());
        Iterator<List<Event>> it = list.iterator();
        while (it.hasNext()) {
            Collections.sort(it.next(), new EventComparator());
        }
        Iterator<List<Event>> it2 = list2.iterator();
        while (it2.hasNext()) {
            Collections.sort(it2.next(), new EventComparator());
        }
        Collections.sort(list, new ListEventComparator());
        Collections.sort(list2, new ListEventComparator());
        Assert.assertArrayEquals(list2.toArray(), list.toArray());
    }

    private OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness(boolean z) throws Exception {
        return CepOperatorTestUtilities.getCepTestHarness(getKeyedCepOpearator(z));
    }

    private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOpearator(boolean z) {
        return CepOperatorTestUtilities.getKeyedCepOpearator(z, new NFAFactory());
    }
}
