/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.errors;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.runtime.InternalSinkRecord;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerErrantRecordReporter
implements ErrantRecordReporter {
    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
    private final RetryWithToleranceOperator retryWithToleranceOperator;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final HeaderConverter headerConverter;
    protected final ConcurrentMap<TopicPartition, List<Future<Void>>> futures;

    public WorkerErrantRecordReporter(RetryWithToleranceOperator retryWithToleranceOperator, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
        this.retryWithToleranceOperator = retryWithToleranceOperator;
        this.keyConverter = keyConverter;
        this.valueConverter = valueConverter;
        this.headerConverter = headerConverter;
        this.futures = new ConcurrentHashMap<TopicPartition, List<Future<Void>>>();
    }

    @Override
    public Future<Void> report(SinkRecord record, Throwable error) {
        ConsumerRecord<byte[], byte[]> consumerRecord;
        if (record instanceof InternalSinkRecord) {
            consumerRecord = ((InternalSinkRecord)record).originalRecord();
        } else {
            String topic = record.topic();
            byte[] key = this.keyConverter.fromConnectData(topic, record.keySchema(), record.key());
            byte[] value = this.valueConverter.fromConnectData(topic, record.valueSchema(), record.value());
            RecordHeaders headers = new RecordHeaders();
            if (record.headers() != null) {
                for (Header header : record.headers()) {
                    String headerKey = header.key();
                    byte[] rawHeader = this.headerConverter.fromConnectHeader(topic, headerKey, header.schema(), header.value());
                    headers.add(headerKey, rawHeader);
                }
            }
            int keyLength = key != null ? key.length : -1;
            int valLength = value != null ? value.length : -1;
            consumerRecord = new ConsumerRecord<byte[], byte[]>(record.topic(), (int)record.kafkaPartition(), record.kafkaOffset(), (long)record.timestamp(), record.timestampType(), keyLength, valLength, key, value, headers, Optional.empty());
        }
        Future<Void> future = this.retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, consumerRecord, error);
        if (!future.isDone()) {
            TopicPartition partition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            this.futures.computeIfAbsent(partition, p -> new ArrayList()).add(future);
        }
        return future;
    }

    public void awaitFutures(Collection<TopicPartition> topicPartitions) {
        this.futuresFor(topicPartitions).forEach(future -> {
            try {
                future.get();
            }
            catch (InterruptedException | ExecutionException e) {
                log.error("Encountered an error while awaiting an errant record future's completion.", e);
                throw new ConnectException(e);
            }
        });
    }

    public void cancelFutures(Collection<TopicPartition> topicPartitions) {
        this.futuresFor(topicPartitions).forEach(future -> {
            try {
                future.cancel(true);
            }
            catch (Exception e) {
                log.error("Encountered an error while cancelling an errant record future", e);
            }
        });
    }

    private Collection<Future<Void>> futuresFor(Collection<TopicPartition> topicPartitions) {
        return topicPartitions.stream().map(this.futures::remove).filter(Objects::nonNull).flatMap(Collection::stream).collect(Collectors.toList());
    }

    public static class ErrantRecordFuture
    implements Future<Void> {
        private final List<Future<RecordMetadata>> futures;

        public ErrantRecordFuture(List<Future<RecordMetadata>> producerFutures) {
            this.futures = producerFutures;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new UnsupportedOperationException("Reporting an errant record cannot be cancelled.");
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return this.futures.stream().allMatch(Future::isDone);
        }

        @Override
        public Void get() throws InterruptedException, ExecutionException {
            for (Future<RecordMetadata> future : this.futures) {
                future.get();
            }
            return null;
        }

        @Override
        public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            for (Future<RecordMetadata> future : this.futures) {
                future.get(timeout, unit);
            }
            return null;
        }
    }
}

