package org.apache.flink.cep.operator;

import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cep/operator/CEPRescalingTest.class */
public class CEPRescalingTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cep/operator/CEPRescalingTest$NFAFactory.class */
    public static class NFAFactory implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private NFAFactory() {
            this(false);
        }

        private NFAFactory(boolean z) {
            this.handleTimeout = z;
        }

        public NFA<Event> createNFA() {
            return NFACompiler.compile(Pattern.begin("start").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPRescalingTest.NFAFactory.3
                private static final long serialVersionUID = 5726188262756267490L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("start");
                }
            }).followedBy("middle").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() { // from class: org.apache.flink.cep.operator.CEPRescalingTest.NFAFactory.2
                private static final long serialVersionUID = 6215754202506583964L;

                public boolean filter(SubEvent subEvent) throws Exception {
                    return subEvent.getVolume() > 5.0d;
                }
            }).followedBy("end").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.operator.CEPRescalingTest.NFAFactory.1
                private static final long serialVersionUID = 7056763917392056548L;

                public boolean filter(Event event) throws Exception {
                    return event.getName().equals("end");
                }
            }).within(Time.milliseconds(10L)), Event.createTypeSerializer(), this.handleTimeout);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cep/operator/CEPRescalingTest$TestKeySelector.class */
    public static class TestKeySelector implements KeySelector<Event, Integer> {
        private static final long serialVersionUID = -4873366487571254798L;

        private TestKeySelector() {
        }

        public Integer getKey(Event event) throws Exception {
            return Integer.valueOf(event.getId());
        }
    }

    @Test
    public void testCEPFunctionScalingUp() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPRescalingTest.1
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        Event event = new Event(7, "start", 1.0d);
        SubEvent subEvent = new SubEvent(7, "foo", 1.0d, 10.0d);
        Event event2 = new Event(7, "end", 1.0d);
        Assert.assertEquals(1L, KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(event), 10));
        Assert.assertEquals(0L, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(10, 2, r0));
        Event event3 = new Event(10, "start", 1.0d);
        SubEvent subEvent2 = new SubEvent(10, "foo", 1.0d, 10.0d);
        Event event4 = new Event(10, "end", 1.0d);
        Assert.assertEquals(9L, KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(event3), 10));
        Assert.assertEquals(1L, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(10, 2, r0));
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = null;
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = null;
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness3 = null;
        try {
            oneInputStreamOperatorTestHarness = getTestHarness(10, 1, 0);
            oneInputStreamOperatorTestHarness.open();
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(event, 1L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new Event(7, "foobar", 1.0d), 2L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(event3, 3L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent2, 4L));
            OperatorSubtaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0L, 0L);
            oneInputStreamOperatorTestHarness.close();
            oneInputStreamOperatorTestHarness2 = getTestHarness(10, 2, 0);
            oneInputStreamOperatorTestHarness2.setup();
            oneInputStreamOperatorTestHarness2.initializeState(snapshot);
            oneInputStreamOperatorTestHarness2.open();
            oneInputStreamOperatorTestHarness2.processWatermark(new Watermark(2L));
            oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(subEvent, 3L));
            oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(event2, 5L));
            oneInputStreamOperatorTestHarness2.processWatermark(new Watermark(Long.MAX_VALUE));
            Assert.assertEquals(3L, oneInputStreamOperatorTestHarness2.getOutput().size());
            verifyWatermark(oneInputStreamOperatorTestHarness2.getOutput().poll(), 2L);
            verifyPattern(oneInputStreamOperatorTestHarness2.getOutput().poll(), event, subEvent, event2);
            oneInputStreamOperatorTestHarness3 = getTestHarness(10, 2, 1);
            oneInputStreamOperatorTestHarness3.setup();
            oneInputStreamOperatorTestHarness3.initializeState(snapshot);
            oneInputStreamOperatorTestHarness3.open();
            oneInputStreamOperatorTestHarness3.processWatermark(new Watermark(2L));
            oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(event4, 5L));
            oneInputStreamOperatorTestHarness3.processElement(new StreamRecord(new Event(42, "start", 1.0d), 4L));
            oneInputStreamOperatorTestHarness3.processWatermark(new Watermark(Long.MAX_VALUE));
            Assert.assertEquals(3L, oneInputStreamOperatorTestHarness3.getOutput().size());
            verifyWatermark(oneInputStreamOperatorTestHarness3.getOutput().poll(), 2L);
            verifyPattern(oneInputStreamOperatorTestHarness3.getOutput().poll(), event3, subEvent2, event4);
            closeSilently(oneInputStreamOperatorTestHarness);
            closeSilently(oneInputStreamOperatorTestHarness2);
            closeSilently(oneInputStreamOperatorTestHarness3);
        } catch (Throwable th) {
            closeSilently(oneInputStreamOperatorTestHarness);
            closeSilently(oneInputStreamOperatorTestHarness2);
            closeSilently(oneInputStreamOperatorTestHarness3);
            throw th;
        }
    }

    private static void closeSilently(OneInputStreamOperatorTestHarness<?, ?> oneInputStreamOperatorTestHarness) {
        if (oneInputStreamOperatorTestHarness != null) {
            try {
                oneInputStreamOperatorTestHarness.close();
            } catch (Throwable th) {
            }
        }
    }

    @Test
    public void testCEPFunctionScalingDown() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { // from class: org.apache.flink.cep.operator.CEPRescalingTest.2
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event event) throws Exception {
                return Integer.valueOf(event.getId());
            }
        };
        Event event = new Event(7, "start", 1.0d);
        SubEvent subEvent = new SubEvent(7, "foo", 1.0d, 10.0d);
        Event event2 = new Event(7, "end", 1.0d);
        Assert.assertEquals(1L, KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(event), 10));
        Assert.assertEquals(0L, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(10, 3, r0));
        Assert.assertEquals(0L, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(10, 2, r0));
        Event event3 = new Event(45, "start", 1.0d);
        SubEvent subEvent2 = new SubEvent(45, "foo", 1.0d, 10.0d);
        Event event4 = new Event(45, "end", 1.0d);
        Assert.assertEquals(6L, KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(event3), 10));
        Assert.assertEquals(1L, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(10, 3, r0));
        Assert.assertEquals(1L, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(10, 2, r0));
        Event event5 = new Event(90, "start", 1.0d);
        SubEvent subEvent3 = new SubEvent(90, "foo", 1.0d, 10.0d);
        Event event6 = new Event(90, "end", 1.0d);
        Assert.assertEquals(2L, KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(event5), 10));
        Assert.assertEquals(0L, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(10, 3, r0));
        Assert.assertEquals(0L, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(10, 2, r0));
        Event event7 = new Event(10, "start", 1.0d);
        SubEvent subEvent4 = new SubEvent(10, "foo", 1.0d, 10.0d);
        Event event8 = new Event(10, "end", 1.0d);
        Assert.assertEquals(9L, KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(event7), 10));
        Assert.assertEquals(2L, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(10, 3, r0));
        Assert.assertEquals(1L, KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(10, 2, r0));
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> testHarness = getTestHarness(10, 3, 0);
        testHarness.open();
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> testHarness2 = getTestHarness(10, 3, 1);
        testHarness2.open();
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> testHarness3 = getTestHarness(10, 3, 2);
        testHarness3.open();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = null;
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = null;
        try {
            testHarness.processWatermark(Long.MIN_VALUE);
            testHarness2.processWatermark(Long.MIN_VALUE);
            testHarness3.processWatermark(Long.MIN_VALUE);
            testHarness.processElement(new StreamRecord(event, 1L));
            testHarness.processElement(new StreamRecord(new Event(7, "foobar", 1.0d), 2L));
            testHarness.processElement(new StreamRecord(subEvent, 3L));
            testHarness.processElement(new StreamRecord(event2, 5L));
            testHarness.processElement(new StreamRecord(event5, 10L));
            testHarness.processElement(new StreamRecord(event, 10L));
            testHarness2.processElement(new StreamRecord(event3, 7L));
            testHarness2.processElement(new StreamRecord(subEvent2, 8L));
            testHarness3.processElement(new StreamRecord(event7, 15L));
            testHarness3.processElement(new StreamRecord(subEvent4, 16L));
            testHarness3.processElement(new StreamRecord(event8, 17L));
            Assert.assertEquals(1L, testHarness.getOutput().size());
            verifyWatermark(testHarness.getOutput().poll(), Long.MIN_VALUE);
            Assert.assertEquals(1L, testHarness2.getOutput().size());
            verifyWatermark(testHarness2.getOutput().poll(), Long.MIN_VALUE);
            Assert.assertEquals(1L, testHarness3.getOutput().size());
            verifyWatermark(testHarness3.getOutput().poll(), Long.MIN_VALUE);
            OperatorSubtaskState repackageState = AbstractStreamOperatorTestHarness.repackageState(new OperatorSubtaskState[]{testHarness2.snapshot(0L, 0L), testHarness.snapshot(0L, 0L), testHarness3.snapshot(0L, 0L)});
            oneInputStreamOperatorTestHarness = getTestHarness(10, 2, 0);
            oneInputStreamOperatorTestHarness.setup();
            oneInputStreamOperatorTestHarness.initializeState(repackageState);
            oneInputStreamOperatorTestHarness.open();
            oneInputStreamOperatorTestHarness2 = getTestHarness(10, 2, 1);
            oneInputStreamOperatorTestHarness2.setup();
            oneInputStreamOperatorTestHarness2.initializeState(repackageState);
            oneInputStreamOperatorTestHarness2.open();
            oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(event4, 11L));
            oneInputStreamOperatorTestHarness2.processWatermark(new Watermark(12L));
            verifyPattern(oneInputStreamOperatorTestHarness2.getOutput().poll(), event3, subEvent2, event4);
            verifyWatermark(oneInputStreamOperatorTestHarness2.getOutput().poll(), 12L);
            oneInputStreamOperatorTestHarness.processWatermark(new Watermark(12L));
            Assert.assertEquals(2L, oneInputStreamOperatorTestHarness.getOutput().size());
            verifyPattern(oneInputStreamOperatorTestHarness.getOutput().poll(), event, subEvent, event2);
            verifyWatermark(oneInputStreamOperatorTestHarness.getOutput().poll(), 12L);
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent3, 15L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(event6, 16L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(subEvent, 15L));
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord(event2, 16L));
            oneInputStreamOperatorTestHarness.processWatermark(new Watermark(Long.MAX_VALUE));
            oneInputStreamOperatorTestHarness2.processWatermark(new Watermark(Long.MAX_VALUE));
            Assert.assertEquals(3L, oneInputStreamOperatorTestHarness.getOutput().size());
            StreamRecord streamRecord = (StreamRecord) oneInputStreamOperatorTestHarness.getOutput().peek();
            Assert.assertTrue(streamRecord.getValue() instanceof Map);
            if (((Event) ((List) ((Map) streamRecord.getValue()).get("start")).get(0)).getId() == 7) {
                verifyPattern(oneInputStreamOperatorTestHarness.getOutput().poll(), event, subEvent, event2);
                verifyPattern(oneInputStreamOperatorTestHarness.getOutput().poll(), event5, subEvent3, event6);
            } else {
                verifyPattern(oneInputStreamOperatorTestHarness.getOutput().poll(), event5, subEvent3, event6);
                verifyPattern(oneInputStreamOperatorTestHarness.getOutput().poll(), event, subEvent, event2);
            }
            Assert.assertEquals(2L, oneInputStreamOperatorTestHarness2.getOutput().size());
            verifyPattern(oneInputStreamOperatorTestHarness2.getOutput().poll(), event7, subEvent4, event8);
            closeSilently(testHarness);
            closeSilently(testHarness2);
            closeSilently(testHarness3);
            closeSilently(oneInputStreamOperatorTestHarness);
            closeSilently(oneInputStreamOperatorTestHarness2);
        } catch (Throwable th) {
            closeSilently(testHarness);
            closeSilently(testHarness2);
            closeSilently(testHarness3);
            closeSilently(oneInputStreamOperatorTestHarness);
            closeSilently(oneInputStreamOperatorTestHarness2);
            throw th;
        }
    }

    private void verifyWatermark(Object obj, long j) {
        Assert.assertTrue(obj instanceof Watermark);
        Assert.assertEquals(j, ((Watermark) obj).getTimestamp());
    }

    private void verifyPattern(Object obj, Event event, SubEvent subEvent, Event event2) {
        Assert.assertTrue(obj instanceof StreamRecord);
        StreamRecord streamRecord = (StreamRecord) obj;
        Assert.assertTrue(streamRecord.getValue() instanceof Map);
        Map map = (Map) streamRecord.getValue();
        Assert.assertEquals(event, ((List) map.get("start")).get(0));
        Assert.assertEquals(subEvent, ((List) map.get("middle")).get(0));
        Assert.assertEquals(event2, ((List) map.get("end")).get(0));
    }

    private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> getTestHarness(int i, int i2, int i3) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness<>(CepOperatorTestUtilities.getKeyedCepOpearator(false, new NFAFactory()), new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, i, i2, i3);
    }
}
