/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.LoggingContext;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class ChangeEventSourceCoordinator<P extends Partition, O extends OffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventSourceCoordinator.class);
    public static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90L);
    protected final Offsets<P, O> previousOffsets;
    protected final ErrorHandler errorHandler;
    protected final ChangeEventSourceFactory<P, O> changeEventSourceFactory;
    protected final ChangeEventSourceMetricsFactory<P> changeEventSourceMetricsFactory;
    protected final ExecutorService executor;
    protected final EventDispatcher<P, ?> eventDispatcher;
    protected final DatabaseSchema<?> schema;
    private volatile boolean running;
    protected volatile StreamingChangeEventSource<P, O> streamingSource;
    protected final ReentrantLock commitOffsetLock = new ReentrantLock();
    protected SnapshotChangeEventSourceMetrics<P> snapshotMetrics;
    protected StreamingChangeEventSourceMetrics<P> streamingMetrics;

    public ChangeEventSourceCoordinator(Offsets<P, O> previousOffsets, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType, CommonConnectorConfig connectorConfig, ChangeEventSourceFactory<P, O> changeEventSourceFactory, ChangeEventSourceMetricsFactory<P> changeEventSourceMetricsFactory, EventDispatcher<P, ?> eventDispatcher, DatabaseSchema<?> schema) {
        this.previousOffsets = previousOffsets;
        this.errorHandler = errorHandler;
        this.changeEventSourceFactory = changeEventSourceFactory;
        this.changeEventSourceMetricsFactory = changeEventSourceMetricsFactory;
        this.executor = Threads.newSingleThreadExecutor(connectorType, connectorConfig.getLogicalName(), "change-event-source-coordinator");
        this.eventDispatcher = eventDispatcher;
        this.schema = schema;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) {
        AtomicReference previousLogContext = new AtomicReference();
        try {
            this.snapshotMetrics = this.changeEventSourceMetricsFactory.getSnapshotMetrics(taskContext, changeEventQueueMetrics, metadataProvider);
            this.streamingMetrics = this.changeEventSourceMetricsFactory.getStreamingMetrics(taskContext, changeEventQueueMetrics, metadataProvider);
            this.running = true;
            this.executor.submit(() -> {
                try {
                    previousLogContext.set(taskContext.configureLoggingContext("snapshot"));
                    this.snapshotMetrics.register();
                    this.streamingMetrics.register();
                    LOGGER.info("Metrics registered");
                    ChangeEventSourceContextImpl context = new ChangeEventSourceContextImpl();
                    LOGGER.info("Context created");
                    SnapshotChangeEventSource<P, O> snapshotSource = this.changeEventSourceFactory.getSnapshotChangeEventSource(this.snapshotMetrics);
                    this.executeChangeEventSources(taskContext, snapshotSource, this.previousOffsets, previousLogContext, context);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOGGER.warn("Change event source executor was interrupted", e);
                }
                catch (Throwable e) {
                    this.errorHandler.setProducerThrowable(e);
                }
                finally {
                    this.streamingConnected(false);
                }
            });
        }
        finally {
            if (previousLogContext.get() != null) {
                ((LoggingContext.PreviousContext)previousLogContext.get()).restore();
            }
        }
    }

    protected void executeChangeEventSources(CdcSourceTaskContext taskContext, SnapshotChangeEventSource<P, O> snapshotSource, Offsets<P, O> previousOffsets, AtomicReference<LoggingContext.PreviousContext> previousLogContext, ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        P partition = previousOffsets.getTheOnlyPartition();
        O previousOffset = previousOffsets.getTheOnlyOffset();
        previousLogContext.set(taskContext.configureLoggingContext("snapshot", (Partition)partition));
        SnapshotResult<O> snapshotResult = this.doSnapshot(snapshotSource, context, partition, previousOffset);
        if (this.running && snapshotResult.isCompletedOrSkipped()) {
            previousLogContext.set(taskContext.configureLoggingContext("streaming", (Partition)partition));
            this.streamEvents(context, partition, snapshotResult.getOffset());
        }
    }

    protected SnapshotResult<O> doSnapshot(SnapshotChangeEventSource<P, O> snapshotSource, ChangeEventSource.ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException {
        CatchUpStreamingResult catchUpStreamingResult = this.executeCatchUpStreaming(context, snapshotSource, partition, previousOffset);
        if (catchUpStreamingResult.performedCatchUpStreaming) {
            this.streamingConnected(false);
            this.commitOffsetLock.lock();
            this.streamingSource = null;
            this.commitOffsetLock.unlock();
        }
        this.eventDispatcher.setEventListener(this.snapshotMetrics);
        SnapshotResult<O> snapshotResult = snapshotSource.execute(context, partition, previousOffset);
        LOGGER.info("Snapshot ended with {}", (Object)snapshotResult);
        if (snapshotResult.getStatus() == SnapshotResult.SnapshotResultStatus.COMPLETED || this.schema.tableInformationComplete()) {
            this.schema.assureNonEmptySchema();
        }
        return snapshotResult;
    }

    protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSource.ChangeEventSourceContext context, SnapshotChangeEventSource<P, O> snapshotSource, P partition, O previousOffset) throws InterruptedException {
        return new CatchUpStreamingResult(false);
    }

    protected void streamEvents(ChangeEventSource.ChangeEventSourceContext context, P partition, O offsetContext) throws InterruptedException {
        this.initStreamEvents(partition, offsetContext);
        LOGGER.info("Starting streaming");
        this.streamingSource.execute(context, partition, offsetContext);
        LOGGER.info("Finished streaming");
    }

    protected void initStreamEvents(P partition, O offsetContext) throws InterruptedException {
        this.streamingSource = this.changeEventSourceFactory.getStreamingChangeEventSource();
        this.eventDispatcher.setEventListener(this.streamingMetrics);
        this.streamingConnected(true);
        this.streamingSource.init();
        Optional incrementalSnapshotChangeEventSource = this.changeEventSourceFactory.getIncrementalSnapshotChangeEventSource(offsetContext, this.snapshotMetrics, this.snapshotMetrics);
        this.eventDispatcher.setIncrementalSnapshotChangeEventSource(incrementalSnapshotChangeEventSource);
        incrementalSnapshotChangeEventSource.ifPresent(x -> x.init(partition, (OffsetContext)offsetContext));
    }

    public void commitOffset(Map<String, ?> offset) {
        if (!this.commitOffsetLock.isLocked() && this.streamingSource != null && offset != null) {
            this.streamingSource.commitOffset(offset);
        }
    }

    public synchronized void stop() throws InterruptedException {
        this.running = false;
        try {
            Thread.interrupted();
            this.executor.shutdown();
            boolean isShutdown = this.executor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            if (!isShutdown) {
                LOGGER.warn("Coordinator didn't stop in the expected time, shutting down executor now");
                Thread.interrupted();
                this.executor.shutdownNow();
                this.executor.awaitTermination(SHUTDOWN_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            }
        }
        finally {
            this.snapshotMetrics.unregister();
            this.streamingMetrics.unregister();
        }
    }

    protected void streamingConnected(boolean status) {
        if (this.changeEventSourceMetricsFactory.connectionMetricHandledByCoordinator()) {
            this.streamingMetrics.connected(status);
            LOGGER.info("Connected metrics set to '{}'", (Object)status);
        }
    }

    protected class CatchUpStreamingResult {
        public boolean performedCatchUpStreaming;

        public CatchUpStreamingResult(boolean performedCatchUpStreaming) {
            this.performedCatchUpStreaming = performedCatchUpStreaming;
        }
    }

    public class ChangeEventSourceContextImpl
    implements ChangeEventSource.ChangeEventSourceContext {
        @Override
        public boolean isRunning() {
            return ChangeEventSourceCoordinator.this.running;
        }
    }
}

