package org.apache.flink.cep.operator;

import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/cep/operator/CEPOperatorTest.class */
public class CEPOperatorTest extends TestLogger {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/cep/operator/CEPOperatorTest$NFAFactory.class */
    private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;

        private NFAFactory() {
        }

        public NFA<Event> createNFA() {
            return NFACompiler.compile(Pattern.begin("start").where(new FilterFunction<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.NFAFactory.3
                private static final long serialVersionUID = 5726188262756267490L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("start");
                }
            }).followedBy("middle").subtype(SubEvent.class).where(new FilterFunction<SubEvent>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.NFAFactory.2
                private static final long serialVersionUID = 6215754202506583964L;

                public boolean filter(SubEvent subEvent) throws Exception {
                    return subEvent.getVolume() > 5.0d;
                }
            }).followedBy("end").where(new FilterFunction<Event>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.NFAFactory.1
                private static final long serialVersionUID = 7056763917392056548L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("end");
                }
            }).within(Time.milliseconds(10L)), Event.createTypeSerializer(), false);
        }
    }

    @Test
    public void testCEPOperatorWatermarkForwarding() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new CEPPatternOperator(Event.createTypeSerializer(), false, new NFAFactory()));
        oneInputStreamOperatorTestHarness.open();
        Watermark watermark = new Watermark(42L);
        oneInputStreamOperatorTestHarness.processWatermark(watermark);
        Object poll = oneInputStreamOperatorTestHarness.getOutput().poll();
        Assert.assertTrue(poll instanceof Watermark);
        Assert.assertEquals(watermark, poll);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.1
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, keySelector, IntSerializer.INSTANCE, new NFAFactory()));
        oneInputStreamOperatorTestHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        Watermark watermark = new Watermark(42L);
        oneInputStreamOperatorTestHarness.processWatermark(watermark);
        Object poll = oneInputStreamOperatorTestHarness.getOutput().poll();
        Assert.assertTrue(poll instanceof Watermark);
        Assert.assertEquals(watermark, poll);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testCEPOperatorCheckpointing() throws Exception {
        new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.2
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new CEPPatternOperator(Event.createTypeSerializer(), false, new NFAFactory()));
        oneInputStreamOperatorTestHarness.open();
        Event event = new Event(42, "start", 1.0d);
        SubEvent subEvent = new SubEvent(42, "foo", 1.0d, 10.0d);
        Event event2 = new Event(42, "end", 1.0d);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(event, 1L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(new CEPPatternOperator(Event.createTypeSerializer(), false, new NFAFactory()));
        oneInputStreamOperatorTestHarness2.setup();
        oneInputStreamOperatorTestHarness2.restore(snapshot, 1L);
        oneInputStreamOperatorTestHarness2.open();
        oneInputStreamOperatorTestHarness2.processWatermark(new Watermark(Long.MIN_VALUE));
        oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 3L));
        oneInputStreamOperatorTestHarness2.processWatermark(new Watermark(2L));
        StreamTaskState snapshot2 = oneInputStreamOperatorTestHarness2.snapshot(1L, 1L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness3 = new OneInputStreamOperatorTestHarness(new CEPPatternOperator(Event.createTypeSerializer(), false, new NFAFactory()));
        oneInputStreamOperatorTestHarness3.setup();
        oneInputStreamOperatorTestHarness3.restore(snapshot2, 2L);
        oneInputStreamOperatorTestHarness3.open();
        oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(subEvent, 3L));
        oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(new Event(42, "start", 1.0d), 4L));
        oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(event2, 5L));
        oneInputStreamOperatorTestHarness3.processWatermark(new Watermark(Long.MAX_VALUE));
        ConcurrentLinkedQueue output = oneInputStreamOperatorTestHarness3.getOutput();
        Assert.assertEquals(2L, output.size());
        Object poll = output.poll();
        Assert.assertTrue(poll instanceof StreamRecord);
        StreamRecord streamRecord = (StreamRecord) poll;
        Assert.assertTrue(streamRecord.getValue() instanceof Map);
        Map map = (Map) streamRecord.getValue();
        Assert.assertEquals(event, map.get("start"));
        Assert.assertEquals(subEvent, map.get("middle"));
        Assert.assertEquals(event2, map.get("end"));
        oneInputStreamOperatorTestHarness3.close();
    }

    @Test
    public void testKeyedCEPOperatorCheckpointing() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.3
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, keySelector, IntSerializer.INSTANCE, new NFAFactory()));
        oneInputStreamOperatorTestHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        Event event = new Event(42, "start", 1.0d);
        SubEvent subEvent = new SubEvent(42, "foo", 1.0d, 10.0d);
        Event event2 = new Event(42, "end", 1.0d);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(event, 1L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, keySelector, IntSerializer.INSTANCE, new NFAFactory()));
        oneInputStreamOperatorTestHarness2.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamOperatorTestHarness2.setup();
        oneInputStreamOperatorTestHarness2.restore(snapshot, 1L);
        oneInputStreamOperatorTestHarness2.open();
        oneInputStreamOperatorTestHarness2.processWatermark(new Watermark(Long.MIN_VALUE));
        oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 3L));
        oneInputStreamOperatorTestHarness2.processWatermark(new Watermark(2L));
        StreamTaskState snapshot2 = oneInputStreamOperatorTestHarness2.snapshot(1L, 1L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness3 = new OneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, keySelector, IntSerializer.INSTANCE, new NFAFactory()));
        oneInputStreamOperatorTestHarness3.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamOperatorTestHarness3.setup();
        oneInputStreamOperatorTestHarness3.restore(snapshot2, 2L);
        oneInputStreamOperatorTestHarness3.open();
        oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(subEvent, 3L));
        oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(new Event(42, "start", 1.0d), 4L));
        oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(event2, 5L));
        oneInputStreamOperatorTestHarness3.processWatermark(new Watermark(Long.MAX_VALUE));
        ConcurrentLinkedQueue output = oneInputStreamOperatorTestHarness3.getOutput();
        Assert.assertEquals(2L, output.size());
        Object poll = output.poll();
        Assert.assertTrue(poll instanceof StreamRecord);
        StreamRecord streamRecord = (StreamRecord) poll;
        Assert.assertTrue(streamRecord.getValue() instanceof Map);
        Map map = (Map) streamRecord.getValue();
        Assert.assertEquals(event, map.get("start"));
        Assert.assertEquals(subEvent, map.get("middle"));
        Assert.assertEquals(event2, map.get("end"));
        oneInputStreamOperatorTestHarness3.close();
    }

    @Test
    public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception {
        String absolutePath = this.tempFolder.newFolder().getAbsolutePath();
        String uri = this.tempFolder.newFolder().toURI().toString();
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(uri, new MemoryStateBackend());
        rocksDBStateBackend.setDbStoragePath(absolutePath);
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPOperatorTest.4
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, keySelector, IntSerializer.INSTANCE, new NFAFactory()));
        oneInputStreamOperatorTestHarness.setStateBackend(rocksDBStateBackend);
        oneInputStreamOperatorTestHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamOperatorTestHarness.open();
        Event event = new Event(42, "start", 1.0d);
        SubEvent subEvent = new SubEvent(42, "foo", 1.0d, 10.0d);
        Event event2 = new Event(42, "end", 1.0d);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(event, 1L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Event(42, "foobar", 1.0d), 2L));
        StreamTaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, keySelector, IntSerializer.INSTANCE, new NFAFactory()));
        RocksDBStateBackend rocksDBStateBackend2 = new RocksDBStateBackend(uri, new MemoryStateBackend());
        rocksDBStateBackend2.setDbStoragePath(absolutePath);
        oneInputStreamOperatorTestHarness2.setStateBackend(rocksDBStateBackend2);
        oneInputStreamOperatorTestHarness2.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamOperatorTestHarness2.setup();
        oneInputStreamOperatorTestHarness2.restore(snapshot, 1L);
        oneInputStreamOperatorTestHarness2.open();
        oneInputStreamOperatorTestHarness2.processWatermark(new Watermark(Long.MIN_VALUE));
        oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0d, 5.0d), 3L));
        oneInputStreamOperatorTestHarness2.processWatermark(new Watermark(2L));
        StreamTaskState snapshot2 = oneInputStreamOperatorTestHarness2.snapshot(1L, 1L);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness3 = new OneInputStreamOperatorTestHarness(new KeyedCEPPatternOperator(Event.createTypeSerializer(), false, keySelector, IntSerializer.INSTANCE, new NFAFactory()));
        RocksDBStateBackend rocksDBStateBackend3 = new RocksDBStateBackend(uri, new MemoryStateBackend());
        rocksDBStateBackend3.setDbStoragePath(absolutePath);
        oneInputStreamOperatorTestHarness3.setStateBackend(rocksDBStateBackend3);
        oneInputStreamOperatorTestHarness3.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
        oneInputStreamOperatorTestHarness3.setup();
        oneInputStreamOperatorTestHarness3.restore(snapshot2, 2L);
        oneInputStreamOperatorTestHarness3.open();
        oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(subEvent, 3L));
        oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(new Event(42, "start", 1.0d), 4L));
        oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(event2, 5L));
        oneInputStreamOperatorTestHarness3.processWatermark(new Watermark(Long.MAX_VALUE));
        ConcurrentLinkedQueue output = oneInputStreamOperatorTestHarness3.getOutput();
        Assert.assertEquals(2L, output.size());
        Object poll = output.poll();
        Assert.assertTrue(poll instanceof StreamRecord);
        StreamRecord streamRecord = (StreamRecord) poll;
        Assert.assertTrue(streamRecord.getValue() instanceof Map);
        Map map = (Map) streamRecord.getValue();
        Assert.assertEquals(event, map.get("start"));
        Assert.assertEquals(subEvent, map.get("middle"));
        Assert.assertEquals(event2, map.get("end"));
        oneInputStreamOperatorTestHarness3.close();
    }
}
