package org.apache.flink.cep.nfa;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.State;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cep/nfa/NFATest.class */
public class NFATest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/cep/nfa/NFATest$NameFilter.class */
    private static class NameFilter implements FilterFunction<Event> {
        private static final long serialVersionUID = 7472112494752423802L;
        private final String name;

        public NameFilter(String str) {
            this.name = str;
        }

        public boolean filter(Event event) throws Exception {
            return event.getName().equals(this.name);
        }
    }

    @Test
    public void testSimpleNFA() {
        NFA nfa = new NFA(Event.createTypeSerializer(), 0L, false);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new StreamRecord(new Event(1, "start", 1.0d), 1L));
        arrayList.add(new StreamRecord(new Event(2, "bar", 2.0d), 2L));
        arrayList.add(new StreamRecord(new Event(3, "start", 3.0d), 3L));
        arrayList.add(new StreamRecord(new Event(4, "end", 4.0d), 4L));
        State state = new State("", State.StateType.Start);
        State state2 = new State("start", State.StateType.Normal);
        State state3 = new State("end", State.StateType.Final);
        StateTransition stateTransition = new StateTransition(StateTransitionAction.TAKE, state2, new FilterFunction<Event>() { // from class: org.apache.flink.cep.nfa.NFATest.1
            private static final long serialVersionUID = -4869589195918650396L;

            public boolean filter(Event event) throws Exception {
                return event.getName().equals("start");
            }
        });
        StateTransition stateTransition2 = new StateTransition(StateTransitionAction.TAKE, state3, new FilterFunction<Event>() { // from class: org.apache.flink.cep.nfa.NFATest.2
            private static final long serialVersionUID = 2979804163709590673L;

            public boolean filter(Event event) throws Exception {
                return event.getName().equals("end");
            }
        });
        StateTransition stateTransition3 = new StateTransition(StateTransitionAction.IGNORE, state2, (FilterFunction) null);
        state.addStateTransition(stateTransition);
        state2.addStateTransition(stateTransition2);
        state2.addStateTransition(stateTransition3);
        nfa.addState(state);
        nfa.addState(state2);
        nfa.addState(state3);
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        hashMap.put("start", new Event(1, "start", 1.0d));
        hashMap.put("end", new Event(4, "end", 4.0d));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("start", new Event(3, "start", 3.0d));
        hashMap2.put("end", new Event(4, "end", 4.0d));
        hashSet.add(hashMap);
        hashSet.add(hashMap2);
        Assert.assertEquals(hashSet, runNFA(nfa, arrayList));
    }

    @Test
    public void testTimeoutWindowPruning() {
        NFA<Event> createStartEndNFA = createStartEndNFA(2L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new StreamRecord(new Event(1, "start", 1.0d), 1L));
        arrayList.add(new StreamRecord(new Event(2, "bar", 2.0d), 2L));
        arrayList.add(new StreamRecord(new Event(3, "start", 3.0d), 3L));
        arrayList.add(new StreamRecord(new Event(4, "end", 4.0d), 4L));
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        hashMap.put("start", new Event(3, "start", 3.0d));
        hashMap.put("end", new Event(4, "end", 4.0d));
        hashSet.add(hashMap);
        Assert.assertEquals(hashSet, runNFA(createStartEndNFA, arrayList));
    }

    @Test
    public void testWindowBorders() {
        NFA<Event> createStartEndNFA = createStartEndNFA(2L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new StreamRecord(new Event(1, "start", 1.0d), 1L));
        arrayList.add(new StreamRecord(new Event(2, "end", 2.0d), 3L));
        Assert.assertEquals(Collections.emptySet(), runNFA(createStartEndNFA, arrayList));
    }

    @Test
    public void testTimeoutWindowPruningWindowBorders() {
        NFA<Event> createStartEndNFA = createStartEndNFA(2L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new StreamRecord(new Event(1, "start", 1.0d), 1L));
        arrayList.add(new StreamRecord(new Event(2, "start", 2.0d), 2L));
        arrayList.add(new StreamRecord(new Event(3, "foobar", 3.0d), 3L));
        arrayList.add(new StreamRecord(new Event(4, "end", 4.0d), 3L));
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        hashMap.put("start", new Event(2, "start", 2.0d));
        hashMap.put("end", new Event(4, "end", 4.0d));
        hashSet.add(hashMap);
        Assert.assertEquals(hashSet, runNFA(createStartEndNFA, arrayList));
    }

    @Test
    public void testStateNameGeneration() {
        String generateStateName = NFA.generateStateName("a[]", 2);
        String generateStateName2 = NFA.generateStateName("a", 3);
        String generateStateName3 = NFA.generateStateName("a[][]", 42);
        Assert.assertEquals("a[2]", generateStateName);
        Assert.assertEquals("a_3", generateStateName2);
        Assert.assertEquals("a[][42]", generateStateName3);
    }

    public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, List<StreamRecord<T>> list) {
        HashSet hashSet = new HashSet();
        for (StreamRecord<T> streamRecord : list) {
            hashSet.addAll((Collection) nfa.process(streamRecord.getValue(), streamRecord.getTimestamp()).f0);
        }
        return hashSet;
    }

    @Test
    public void testNFASerialization() throws IOException, ClassNotFoundException {
        NFA nfa = new NFA(Event.createTypeSerializer(), 0L, false);
        State state = new State("", State.StateType.Start);
        State state2 = new State("start", State.StateType.Normal);
        State state3 = new State("end", State.StateType.Final);
        StateTransition stateTransition = new StateTransition(StateTransitionAction.TAKE, state2, new NameFilter("start"));
        StateTransition stateTransition2 = new StateTransition(StateTransitionAction.TAKE, state3, new NameFilter("end"));
        StateTransition stateTransition3 = new StateTransition(StateTransitionAction.IGNORE, state2, (FilterFunction) null);
        state.addStateTransition(stateTransition);
        state2.addStateTransition(stateTransition2);
        state2.addStateTransition(stateTransition3);
        nfa.addState(state);
        nfa.addState(state2);
        nfa.addState(state3);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        new ObjectOutputStream(byteArrayOutputStream).writeObject(nfa);
        Assert.assertEquals(nfa, (NFA) new ObjectInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())).readObject());
    }

    private NFA<Event> createStartEndNFA(long j) {
        NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), j, false);
        State state = new State("", State.StateType.Start);
        State state2 = new State("start", State.StateType.Normal);
        State state3 = new State("end", State.StateType.Final);
        StateTransition stateTransition = new StateTransition(StateTransitionAction.TAKE, state2, new FilterFunction<Event>() { // from class: org.apache.flink.cep.nfa.NFATest.3
            private static final long serialVersionUID = -4869589195918650396L;

            public boolean filter(Event event) throws Exception {
                return event.getName().equals("start");
            }
        });
        StateTransition stateTransition2 = new StateTransition(StateTransitionAction.TAKE, state3, new FilterFunction<Event>() { // from class: org.apache.flink.cep.nfa.NFATest.4
            private static final long serialVersionUID = 2979804163709590673L;

            public boolean filter(Event event) throws Exception {
                return event.getName().equals("end");
            }
        });
        StateTransition stateTransition3 = new StateTransition(StateTransitionAction.IGNORE, state2, (FilterFunction) null);
        state.addStateTransition(stateTransition);
        state2.addStateTransition(stateTransition2);
        state2.addStateTransition(stateTransition3);
        nfa.addState(state);
        nfa.addState(state2);
        nfa.addState(state3);
        return nfa;
    }
}
