/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.errors;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeadLetterQueueReporter
implements ErrorReporter {
    private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueReporter.class);
    private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
    public static final String HEADER_PREFIX = "__connect.errors.";
    public static final String ERROR_HEADER_ORIG_TOPIC = "__connect.errors.topic";
    public static final String ERROR_HEADER_ORIG_PARTITION = "__connect.errors.partition";
    public static final String ERROR_HEADER_ORIG_OFFSET = "__connect.errors.offset";
    public static final String ERROR_HEADER_CONNECTOR_NAME = "__connect.errors.connector.name";
    public static final String ERROR_HEADER_TASK_ID = "__connect.errors.task.id";
    public static final String ERROR_HEADER_STAGE = "__connect.errors.stage";
    public static final String ERROR_HEADER_EXECUTING_CLASS = "__connect.errors.class.name";
    public static final String ERROR_HEADER_EXCEPTION = "__connect.errors.exception.class.name";
    public static final String ERROR_HEADER_EXCEPTION_MESSAGE = "__connect.errors.exception.message";
    public static final String ERROR_HEADER_EXCEPTION_STACK_TRACE = "__connect.errors.exception.stacktrace";
    private final SinkConnectorConfig connConfig;
    private final ConnectorTaskId connectorTaskId;
    private final ErrorHandlingMetrics errorHandlingMetrics;
    private final String dlqTopicName;
    private KafkaProducer<byte[], byte[]> kafkaProducer;

    public static DeadLetterQueueReporter createAndSetup(Map<String, Object> adminProps, ConnectorTaskId id, SinkConnectorConfig sinkConfig, Map<String, Object> producerProps, ErrorHandlingMetrics errorHandlingMetrics) {
        block16: {
            String topic = sinkConfig.dlqTopicName();
            try (Admin admin = Admin.create(adminProps);){
                if (!admin.listTopics().names().get().contains(topic)) {
                    log.error("Topic {} doesn't exist. Will attempt to create topic.", (Object)topic);
                    NewTopic schemaTopicRequest = new NewTopic(topic, 1, sinkConfig.dlqTopicReplicationFactor());
                    admin.createTopics(Collections.singleton(schemaTopicRequest)).all().get();
                }
            }
            catch (InterruptedException e) {
                throw new ConnectException("Could not initialize dead letter queue with topic=" + topic, e);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof TopicExistsException) break block16;
                throw new ConnectException("Could not initialize dead letter queue with topic=" + topic, e);
            }
        }
        KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<byte[], byte[]>(producerProps);
        return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id, errorHandlingMetrics);
    }

    DeadLetterQueueReporter(KafkaProducer<byte[], byte[]> kafkaProducer, SinkConnectorConfig connConfig, ConnectorTaskId id, ErrorHandlingMetrics errorHandlingMetrics) {
        Objects.requireNonNull(kafkaProducer);
        Objects.requireNonNull(connConfig);
        Objects.requireNonNull(id);
        Objects.requireNonNull(errorHandlingMetrics);
        this.kafkaProducer = kafkaProducer;
        this.connConfig = connConfig;
        this.connectorTaskId = id;
        this.errorHandlingMetrics = errorHandlingMetrics;
        this.dlqTopicName = connConfig.dlqTopicName().trim();
    }

    @Override
    public Future<RecordMetadata> report(ProcessingContext context) {
        if (this.dlqTopicName.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        this.errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
        ConsumerRecord<byte[], byte[]> originalMessage = context.consumerRecord();
        if (originalMessage == null) {
            this.errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
            return CompletableFuture.completedFuture(null);
        }
        ProducerRecord<byte[], byte[]> producerRecord = originalMessage.timestamp() == -1L ? new ProducerRecord<byte[], byte[]>(this.dlqTopicName, null, originalMessage.key(), originalMessage.value(), originalMessage.headers()) : new ProducerRecord<byte[], byte[]>(this.dlqTopicName, null, originalMessage.timestamp(), originalMessage.key(), originalMessage.value(), originalMessage.headers());
        if (this.connConfig.isDlqContextHeadersEnabled()) {
            this.populateContextHeaders(producerRecord, context);
        }
        return this.kafkaProducer.send(producerRecord, (metadata, exception) -> {
            if (exception != null) {
                log.error("Could not produce message to dead letter queue. topic=" + this.dlqTopicName, exception);
                this.errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
            }
        });
    }

    void populateContextHeaders(ProducerRecord<byte[], byte[]> producerRecord, ProcessingContext context) {
        Headers headers = producerRecord.headers();
        if (context.consumerRecord() != null) {
            headers.add(ERROR_HEADER_ORIG_TOPIC, this.toBytes(context.consumerRecord().topic()));
            headers.add(ERROR_HEADER_ORIG_PARTITION, this.toBytes(context.consumerRecord().partition()));
            headers.add(ERROR_HEADER_ORIG_OFFSET, this.toBytes(context.consumerRecord().offset()));
        }
        headers.add(ERROR_HEADER_CONNECTOR_NAME, this.toBytes(this.connectorTaskId.connector()));
        headers.add(ERROR_HEADER_TASK_ID, this.toBytes(String.valueOf(this.connectorTaskId.task())));
        headers.add(ERROR_HEADER_STAGE, this.toBytes(context.stage().name()));
        headers.add(ERROR_HEADER_EXECUTING_CLASS, this.toBytes(context.executingClass().getName()));
        if (context.error() != null) {
            headers.add(ERROR_HEADER_EXCEPTION, this.toBytes(context.error().getClass().getName()));
            headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, this.toBytes(context.error().getMessage()));
            byte[] trace = this.stacktrace(context.error());
            if (trace != null) {
                headers.add(ERROR_HEADER_EXCEPTION_STACK_TRACE, trace);
            }
        }
    }

    private byte[] stacktrace(Throwable error) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        try {
            PrintStream stream = new PrintStream((OutputStream)bos, true, StandardCharsets.UTF_8.name());
            error.printStackTrace(stream);
            bos.close();
            return bos.toByteArray();
        }
        catch (IOException e) {
            log.error("Could not serialize stacktrace.", e);
            return null;
        }
    }

    private byte[] toBytes(int value) {
        return this.toBytes(String.valueOf(value));
    }

    private byte[] toBytes(long value) {
        return this.toBytes(String.valueOf(value));
    }

    private byte[] toBytes(String value) {
        if (value != null) {
            return value.getBytes(StandardCharsets.UTF_8);
        }
        return null;
    }

    @Override
    public void close() {
        this.kafkaProducer.close();
    }
}

