/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.tools;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.tools.VerifiableSourceConnector;
import org.apache.kafka.tools.ThroughputThrottler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VerifiableSourceTask
extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(VerifiableSourceTask.class);
    public static final String NAME_CONFIG = "name";
    public static final String ID_CONFIG = "id";
    public static final String TOPIC_CONFIG = "topic";
    public static final String THROUGHPUT_CONFIG = "throughput";
    private static final String ID_FIELD = "id";
    private static final String SEQNO_FIELD = "seqno";
    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
    private String name;
    private int id;
    private String topic;
    private Map<String, Integer> partition;
    private long startingSeqno;
    private long seqno;
    private ThroughputThrottler throttler;

    @Override
    public String version() {
        return new VerifiableSourceConnector().version();
    }

    @Override
    public void start(Map<String, String> props) {
        long throughput;
        try {
            this.name = props.get(NAME_CONFIG);
            this.id = Integer.parseInt(props.get("id"));
            this.topic = props.get(TOPIC_CONFIG);
            throughput = Long.parseLong(props.get(THROUGHPUT_CONFIG));
        }
        catch (NumberFormatException e) {
            throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
        }
        this.partition = Collections.singletonMap("id", this.id);
        Map<String, Object> previousOffset = this.context.offsetStorageReader().offset(this.partition);
        this.seqno = previousOffset != null ? (Long)previousOffset.get(SEQNO_FIELD) + 1L : 0L;
        this.startingSeqno = this.seqno;
        this.throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
        log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from seqno {}", this.name, this.id, this.topic, this.startingSeqno);
    }

    @Override
    public List<SourceRecord> poll() {
        String dataJson;
        long sendStartMs = System.currentTimeMillis();
        if (this.throttler.shouldThrottle(this.seqno - this.startingSeqno, sendStartMs)) {
            this.throttler.throttle();
        }
        long nowMs = System.currentTimeMillis();
        HashMap<String, Object> data = new HashMap<String, Object>();
        data.put(NAME_CONFIG, this.name);
        data.put("task", this.id);
        data.put(TOPIC_CONFIG, this.topic);
        data.put("time_ms", nowMs);
        data.put(SEQNO_FIELD, this.seqno);
        try {
            dataJson = JSON_SERDE.writeValueAsString(data);
        }
        catch (JsonProcessingException e) {
            dataJson = "Bad data can't be written as json: " + e.getMessage();
        }
        System.out.println(dataJson);
        Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, this.seqno);
        SourceRecord srcRecord = new SourceRecord(this.partition, ccOffset, this.topic, Schema.INT32_SCHEMA, this.id, Schema.INT64_SCHEMA, (Object)this.seqno);
        List<SourceRecord> result = Collections.singletonList(srcRecord);
        ++this.seqno;
        return result;
    }

    @Override
    public void commitRecord(SourceRecord record, RecordMetadata metadata) {
        String dataJson;
        HashMap<String, Object> data = new HashMap<String, Object>();
        data.put(NAME_CONFIG, this.name);
        data.put("task", this.id);
        data.put(TOPIC_CONFIG, this.topic);
        data.put("time_ms", System.currentTimeMillis());
        data.put(SEQNO_FIELD, record.value());
        data.put("committed", true);
        try {
            dataJson = JSON_SERDE.writeValueAsString(data);
        }
        catch (JsonProcessingException e) {
            dataJson = "Bad data can't be written as json: " + e.getMessage();
        }
        System.out.println(dataJson);
    }

    @Override
    public void stop() {
        this.throttler.wakeup();
    }
}

