package org.apache.flink.test.javaApiOperators;

import java.io.IOException;
import java.util.ArrayList;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.class */
public class RemoteEnvironmentITCase {
    private static final int TM_SLOTS = 4;
    private static final int NUM_TM = 1;
    private static final int USER_DOP = 2;
    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    private static ForkableFlinkMiniCluster cluster;

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase$ParallelismDependentInputFormat.class */
    private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
        private transient boolean emitted;

        private ParallelismDependentInputFormat() {
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public GenericInputSplit[] m612createInputSplits(int i) throws IOException {
            Assert.assertEquals(2L, i);
            return super.createInputSplits(i);
        }

        public boolean reachedEnd() {
            return this.emitted;
        }

        public Integer nextRecord(Integer num) {
            if (this.emitted) {
                return null;
            }
            this.emitted = true;
            return Integer.valueOf(RemoteEnvironmentITCase.NUM_TM);
        }
    }

    @BeforeClass
    public static void setupCluster() {
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", NUM_TM);
            configuration.setInteger("taskmanager.numberOfTaskSlots", TM_SLOTS);
            cluster = new ForkableFlinkMiniCluster(configuration, false);
            cluster.start();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Error starting test cluster: " + e.getMessage());
        }
    }

    @AfterClass
    public static void tearDownCluster() {
        try {
            cluster.stop();
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail("Cluster shutdown caused an exception: " + th.getMessage());
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testInvalidAkkaConfiguration() throws Throwable {
        Configuration configuration = new Configuration();
        configuration.setString("akka.startup-timeout", INVALID_STARTUP_TIMEOUT);
        ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment(cluster.hostname(), cluster.getLeaderRPCPort(), configuration, new String[0]);
        createRemoteEnvironment.getConfig().disableSysoutLogging();
        createRemoteEnvironment.createInput(new TestNonRichInputFormat()).output(new LocalCollectionOutputFormat(new ArrayList()));
        try {
            createRemoteEnvironment.execute();
            Assert.fail("Program should not run successfully, cause of invalid akka settings.");
        } catch (IOException e) {
            throw e.getCause();
        }
    }

    @Test
    public void testUserSpecificParallelism() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("akka.startup-timeout", VALID_STARTUP_TIMEOUT);
        ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment(cluster.hostname(), cluster.getLeaderRPCPort(), configuration, new String[0]);
        createRemoteEnvironment.setParallelism(USER_DOP);
        createRemoteEnvironment.getConfig().disableSysoutLogging();
        Assert.assertEquals(2L, createRemoteEnvironment.createInput(new ParallelismDependentInputFormat()).rebalance().mapPartition(new RichMapPartitionFunction<Integer, Integer>() { // from class: org.apache.flink.test.javaApiOperators.RemoteEnvironmentITCase.1
            public void mapPartition(Iterable<Integer> iterable, Collector<Integer> collector) throws Exception {
                collector.collect(Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
            }
        }).collect().size());
    }
}
