/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientCompatibilityTest {
    private static final Logger log = LoggerFactory.getLogger(ClientCompatibilityTest.class);
    private final TestConfig testConfig;
    private final byte[] message1;
    private final byte[] message2;

    public static void main(String[] args) throws Exception {
        ArgumentParser parser = ArgumentParsers.newArgumentParser("client-compatibility-test").defaultHelp(true).description("This tool is used to verify client compatibility guarantees.");
        parser.addArgument("--topic").action(Arguments.store()).required(true).type(String.class).dest("topic").metavar("TOPIC").help("the compatibility test will produce messages to this topic");
        parser.addArgument("--bootstrap-server").action(Arguments.store()).required(true).type(String.class).dest("bootstrapServer").metavar("BOOTSTRAP_SERVER").help("The server(s) to use for bootstrapping");
        parser.addArgument("--offsets-for-times-supported").action(Arguments.store()).required(true).type(Boolean.class).dest("offsetsForTimesSupported").metavar("OFFSETS_FOR_TIMES_SUPPORTED").help("True if KafkaConsumer#offsetsForTimes is supported by the current broker version");
        parser.addArgument("--cluster-id-supported").action(Arguments.store()).required(true).type(Boolean.class).dest("clusterIdSupported").metavar("CLUSTER_ID_SUPPORTED").help("True if cluster IDs are supported.  False if cluster ID always appears as null.");
        parser.addArgument("--expect-record-too-large-exception").action(Arguments.store()).required(true).type(Boolean.class).dest("expectRecordTooLargeException").metavar("EXPECT_RECORD_TOO_LARGE_EXCEPTION").help("True if we should expect a RecordTooLargeException when trying to read from a topic that contains a message that is bigger than max.partition.fetch.bytes.  This is pre-KIP-74 behavior.");
        parser.addArgument("--num-cluster-nodes").action(Arguments.store()).required(true).type(Integer.class).dest("numClusterNodes").metavar("NUM_CLUSTER_NODES").help("The number of cluster nodes we should expect to see from the AdminClient.");
        parser.addArgument("--create-topics-supported").action(Arguments.store()).required(true).type(Boolean.class).dest("createTopicsSupported").metavar("CREATE_TOPICS_SUPPORTED").help("Whether we should be able to create topics via the AdminClient.");
        parser.addArgument("--describe-acls-supported").action(Arguments.store()).required(true).type(Boolean.class).dest("describeAclsSupported").metavar("DESCRIBE_ACLS_SUPPORTED").help("Whether describeAcls is supported in the AdminClient.");
        parser.addArgument("--describe-configs-supported").action(Arguments.store()).required(true).type(Boolean.class).dest("describeConfigsSupported").metavar("DESCRIBE_CONFIGS_SUPPORTED").help("Whether describeConfigs is supported in the AdminClient.");
        Namespace res = null;
        try {
            res = parser.parseArgs(args);
        }
        catch (ArgumentParserException e) {
            if (args.length == 0) {
                parser.printHelp();
                Exit.exit(0);
            }
            parser.handleError(e);
            Exit.exit(1);
        }
        TestConfig testConfig = new TestConfig(res);
        ClientCompatibilityTest test = new ClientCompatibilityTest(testConfig);
        try {
            test.run();
        }
        catch (Throwable t) {
            System.out.printf("FAILED: Caught exception %s%n%n", t.getMessage());
            t.printStackTrace();
            Exit.exit(1);
        }
        System.out.println("SUCCESS.");
        Exit.exit(0);
    }

    private static String toHexString(byte[] buf) {
        StringBuilder bld = new StringBuilder();
        for (byte b : buf) {
            bld.append(String.format("%02x", b));
        }
        return bld.toString();
    }

    private static void compareArrays(byte[] a, byte[] b) {
        if (!Arrays.equals(a, b)) {
            throw new RuntimeException("Arrays did not match: expected " + ClientCompatibilityTest.toHexString(a) + ", got " + ClientCompatibilityTest.toHexString(b));
        }
    }

    ClientCompatibilityTest(TestConfig testConfig) {
        this.testConfig = testConfig;
        long curTime = Time.SYSTEM.milliseconds();
        ByteBuffer buf = ByteBuffer.allocate(8);
        buf.putLong(curTime);
        this.message1 = buf.array();
        ByteBuffer buf2 = ByteBuffer.allocate(4096);
        for (long i = 0L; i < (long)buf2.capacity(); i += 8L) {
            buf2.putLong(curTime + i);
        }
        this.message2 = buf2.array();
    }

    void run() throws Throwable {
        long prodTimeMs = Time.SYSTEM.milliseconds();
        this.testAdminClient();
        this.testProduce();
        this.testConsume(prodTimeMs);
    }

    public void testProduce() throws Exception {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", this.testConfig.bootstrapServer);
        ByteArraySerializer serializer = new ByteArraySerializer();
        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(producerProps, (Serializer<byte[]>)serializer, (Serializer<byte[]>)serializer);
        ProducerRecord record1 = new ProducerRecord(this.testConfig.topic, this.message1);
        Future<RecordMetadata> future1 = producer.send(record1);
        ProducerRecord record2 = new ProducerRecord(this.testConfig.topic, this.message2);
        Future<RecordMetadata> future2 = producer.send(record2);
        producer.flush();
        future1.get();
        future2.get();
        producer.close();
    }

    void testAdminClient() throws Throwable {
        Properties adminProps = new Properties();
        adminProps.put("bootstrap.servers", this.testConfig.bootstrapServer);
        try (Admin client = Admin.create(adminProps);){
            Collection<Node> nodes;
            while ((nodes = client.describeCluster().nodes().get()).size() != this.testConfig.numClusterNodes) {
                if (nodes.size() > this.testConfig.numClusterNodes) {
                    throw new KafkaException("Expected to see " + this.testConfig.numClusterNodes + " nodes, but saw " + nodes.size());
                }
                Thread.sleep(1L);
                log.info("Saw only {} cluster nodes.  Waiting to see {}.", (Object)nodes.size(), (Object)this.testConfig.numClusterNodes);
            }
            this.testDescribeConfigsMethod(client);
            this.tryFeature("createTopics", this.testConfig.createTopicsSupported, () -> {
                try {
                    client.createTopics(Collections.singleton(new NewTopic("newtopic", 1, 1))).all().get();
                }
                catch (ExecutionException e) {
                    throw e.getCause();
                }
            }, () -> this.createTopicsResultTest(client, Collections.singleton("newtopic")));
            while (true) {
                Collection<TopicListing> listings = client.listTopics().listings().get();
                if (!this.testConfig.createTopicsSupported || this.topicExists(listings, "newtopic")) break;
                Thread.sleep(1L);
                log.info("Did not see newtopic.  Retrying listTopics...");
            }
            this.tryFeature("describeAclsSupported", this.testConfig.describeAclsSupported, () -> {
                try {
                    client.describeAcls(AclBindingFilter.ANY).values().get();
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof SecurityDisabledException) {
                        return;
                    }
                    throw e.getCause();
                }
            });
        }
    }

    private void testDescribeConfigsMethod(Admin client) throws Throwable {
        this.tryFeature("describeConfigsSupported", this.testConfig.describeConfigsSupported, () -> {
            try {
                Collection<Node> nodes = client.describeCluster().nodes().get();
                ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, nodes.iterator().next().idString());
                Map<ConfigResource, Config> brokerConfig = client.describeConfigs(Collections.singleton(configResource)).all().get();
                if (brokerConfig.get(configResource).entries().isEmpty()) {
                    throw new KafkaException("Expected to see config entries, but got zero entries");
                }
            }
            catch (ExecutionException e) {
                throw e.getCause();
            }
        });
    }

    private void createTopicsResultTest(Admin client, Collection<String> topics) throws InterruptedException, ExecutionException {
        while (true) {
            try {
                client.describeTopics(topics).allTopicNames().get();
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof UnknownTopicOrPartitionException) continue;
                throw e;
            }
            break;
        }
    }

    private boolean topicExists(Collection<TopicListing> listings, String topicName) {
        boolean foundTopic = false;
        for (TopicListing listing : listings) {
            if (!listing.name().equals(topicName)) continue;
            if (listing.isInternal()) {
                throw new KafkaException(String.format("Did not expect %s to be an internal topic.", topicName));
            }
            foundTopic = true;
        }
        return foundTopic;
    }

    public void testConsume(final long prodTimeMs) throws Throwable {
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", this.testConfig.bootstrapServer);
        consumerProps.put("max.partition.fetch.bytes", (Object)512);
        ClientCompatibilityTestDeserializer deserializer = new ClientCompatibilityTestDeserializer(this.testConfig.expectClusterId);
        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(consumerProps, (Deserializer<byte[]>)deserializer, (Deserializer<byte[]>)deserializer);){
            block21: {
                List<PartitionInfo> partitionInfos = consumer.partitionsFor(this.testConfig.topic);
                if (partitionInfos.size() < 1) {
                    throw new RuntimeException("Expected at least one partition for topic " + this.testConfig.topic);
                }
                HashMap<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition, Long>();
                LinkedList<TopicPartition> topicPartitions = new LinkedList<TopicPartition>();
                for (PartitionInfo partitionInfo : partitionInfos) {
                    TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    timestampsToSearch.put(topicPartition, prodTimeMs);
                    topicPartitions.add(topicPartition);
                }
                OffsetsForTime offsetsForTime = new OffsetsForTime();
                this.tryFeature("offsetsForTimes", this.testConfig.offsetsForTimesSupported, () -> {
                    offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
                }, () -> log.info("offsetsForTime = {}", (Object)offsetsForTime.result));
                consumer.beginningOffsets(timestampsToSearch.keySet());
                consumer.endOffsets(timestampsToSearch.keySet());
                consumer.assign(topicPartitions);
                consumer.seekToBeginning(topicPartitions);
                Iterator<byte[]> iter = new Iterator<byte[]>(){
                    private static final int TIMEOUT_MS = 10000;
                    private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = null;
                    private byte[] next = null;

                    private byte[] fetchNext() {
                        while (true) {
                            long curTime;
                            if ((curTime = Time.SYSTEM.milliseconds()) - prodTimeMs > 10000L) {
                                throw new RuntimeException("Timed out after 10000 ms.");
                            }
                            if (this.recordIter == null) {
                                ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
                                this.recordIter = records.iterator();
                            }
                            if (this.recordIter.hasNext()) {
                                return this.recordIter.next().value();
                            }
                            this.recordIter = null;
                        }
                    }

                    @Override
                    public boolean hasNext() {
                        if (this.next != null) {
                            return true;
                        }
                        this.next = this.fetchNext();
                        return this.next != null;
                    }

                    @Override
                    public byte[] next() {
                        if (!this.hasNext()) {
                            throw new NoSuchElementException();
                        }
                        byte[] cur = this.next;
                        this.next = null;
                        return cur;
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
                byte[] next = (byte[])iter.next();
                try {
                    ClientCompatibilityTest.compareArrays(this.message1, next);
                    log.debug("Found first message...");
                }
                catch (RuntimeException e) {
                    throw new RuntimeException("The first message in this topic was not ours. Please use a new topic when running this program.");
                }
                try {
                    next = (byte[])iter.next();
                    if (this.testConfig.expectRecordTooLargeException) {
                        throw new RuntimeException("Expected to get a RecordTooLargeException when reading a record bigger than max.partition.fetch.bytes");
                    }
                    try {
                        ClientCompatibilityTest.compareArrays(this.message2, next);
                    }
                    catch (RuntimeException e) {
                        System.out.println("The second message in this topic was not ours. Please use a new topic when running this program.");
                        Exit.exit(1);
                    }
                }
                catch (RecordTooLargeException e) {
                    log.debug("Got RecordTooLargeException", e);
                    if (this.testConfig.expectRecordTooLargeException) break block21;
                    throw new RuntimeException("Got an unexpected RecordTooLargeException when reading a record bigger than max.partition.fetch.bytes");
                }
            }
            log.debug("Closing consumer.");
        }
        log.info("Closed consumer.");
    }

    private void tryFeature(String featureName, boolean supported, Invoker invoker) throws Throwable {
        this.tryFeature(featureName, supported, invoker, () -> {});
    }

    private void tryFeature(String featureName, boolean supported, Invoker invoker, ResultTester resultTester) throws Throwable {
        try {
            invoker.invoke();
            log.info("Successfully used feature {}", (Object)featureName);
        }
        catch (UnsupportedVersionException e) {
            log.info("Got UnsupportedVersionException when attempting to use feature {}", (Object)featureName);
            if (supported) {
                throw new RuntimeException("Expected " + featureName + " to be supported, but it wasn't.", e);
            }
            return;
        }
        if (!supported) {
            throw new RuntimeException("Did not expect " + featureName + " to be supported, but it was.");
        }
        resultTester.test();
    }

    private static interface ResultTester {
        public void test() throws Throwable;
    }

    private static interface Invoker {
        public void invoke() throws Throwable;
    }

    public static class ClientCompatibilityTestDeserializer
    implements Deserializer<byte[]>,
    ClusterResourceListener {
        private final boolean expectClusterId;

        ClientCompatibilityTestDeserializer(boolean expectClusterId) {
            this.expectClusterId = expectClusterId;
        }

        @Override
        public byte[] deserialize(String topic, byte[] data) {
            return data;
        }

        @Override
        public void onUpdate(ClusterResource clusterResource) {
            if (this.expectClusterId) {
                if (clusterResource.clusterId() == null) {
                    throw new RuntimeException("Expected cluster id to be supported, but it was null.");
                }
            } else if (clusterResource.clusterId() != null) {
                throw new RuntimeException("Expected cluster id to be null, but it was supported.");
            }
        }
    }

    private static class OffsetsForTime {
        Map<TopicPartition, OffsetAndTimestamp> result;

        private OffsetsForTime() {
        }

        public String toString() {
            return this.result.toString();
        }
    }

    static class TestConfig {
        final String bootstrapServer;
        final String topic;
        final boolean offsetsForTimesSupported;
        final boolean expectClusterId;
        final boolean expectRecordTooLargeException;
        final int numClusterNodes;
        final boolean createTopicsSupported;
        final boolean describeAclsSupported;
        final boolean describeConfigsSupported;

        TestConfig(Namespace res) {
            this.bootstrapServer = res.getString("bootstrapServer");
            this.topic = res.getString("topic");
            this.offsetsForTimesSupported = res.getBoolean("offsetsForTimesSupported");
            this.expectClusterId = res.getBoolean("clusterIdSupported");
            this.expectRecordTooLargeException = res.getBoolean("expectRecordTooLargeException");
            this.numClusterNodes = res.getInt("numClusterNodes");
            this.createTopicsSupported = res.getBoolean("createTopicsSupported");
            this.describeAclsSupported = res.getBoolean("describeAclsSupported");
            this.describeConfigsSupported = res.getBoolean("describeConfigsSupported");
        }
    }
}

