/*
 * Decompiled with CFR 0.152.
 */
package io.amient.affinity.core.storage;

import io.amient.affinity.core.storage.LogEntry;
import io.amient.affinity.core.storage.LogStorage;
import io.amient.affinity.core.storage.MemStore;
import io.amient.affinity.core.storage.ObservableState;
import io.amient.affinity.core.storage.Record;
import io.amient.affinity.core.util.EventTime;
import io.amient.affinity.core.util.MappedJavaFuture;
import java.io.Closeable;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Log<POS extends Comparable<POS>>
extends Thread
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(Log.class);
    private volatile FSM fsm = FSM.INIT;
    private final AtomicReference<POS> checkpoint = new AtomicReference<Object>(null);
    private volatile boolean checkpointModified = false;
    private volatile boolean stopped = true;
    private final boolean enabled;
    private final Path checkpointFile;
    private final LogStorage<POS> storage;
    private AtomicReference<LogSync> logsync = new AtomicReference();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Log(LogStorage<POS> storage, Path checkpointFile) {
        this.storage = storage;
        this.enabled = checkpointFile != null;
        this.checkpointFile = checkpointFile;
        if (this.enabled) {
            if (Files.exists(checkpointFile, new LinkOption[0])) {
                try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(checkpointFile.toFile()));){
                    this.checkpoint.set((Comparable)ois.readObject());
                }
                catch (Throwable e) {
                    log.warn("Invalid checkpoint file: " + checkpointFile + ", going to rewind fully.", e);
                    this.checkpoint.set(null);
                }
            }
            log.info("Initialized " + this.checkpoint + " from " + checkpointFile);
        }
    }

    public POS getCheckpoint() {
        return (POS)((Comparable)this.checkpoint.get());
    }

    private void fsmEnterWriteState() {
        switch (this.fsm) {
            case INIT: {
                throw new IllegalStateException("Bootstrap is required before writing");
            }
            case TAIL: {
                this.stopLogSync();
                break;
            }
        }
        this.fsm = FSM.WRITE;
    }

    public Future<POS> append(final MemStore kvstore, final byte[] key, final byte[] valueBytes, final long recordTimestamp) {
        this.fsmEnterWriteState();
        Record<byte[], byte[]> record = new Record<byte[], byte[]>(key, valueBytes, recordTimestamp);
        return new MappedJavaFuture<POS, POS>(this.storage.append(record)){

            @Override
            public POS map(POS position) {
                kvstore.put(ByteBuffer.wrap(key), kvstore.wrap(valueBytes, recordTimestamp));
                Log.this.updateCheckpoint(position);
                return position;
            }
        };
    }

    public Future<POS> delete(final MemStore kvstore, final byte[] key) {
        this.fsmEnterWriteState();
        return new MappedJavaFuture<POS, POS>(this.storage.delete(key)){

            @Override
            public POS map(POS position) {
                kvstore.remove(ByteBuffer.wrap(key));
                Log.this.updateCheckpoint(position);
                return position;
            }
        };
    }

    public <K> long bootstrap(String identifier, MemStore kvstore, int partition, Optional<ObservableState<K>> observableState) {
        switch (this.fsm) {
            case TAIL: {
                this.stopLogSync();
                break;
            }
            case WRITE: {
                this.flushWrites();
                break;
            }
        }
        this.fsm = FSM.BOOT;
        POS checkpoint = this.getCheckpoint();
        log.debug("Bootstrap " + identifier + " from checkpoint " + checkpoint + ":end-offset");
        long t = EventTime.unix();
        POS endOffsest = this.storage.reset(partition, checkpoint);
        long numRecordsProcessed = 0L;
        if (endOffsest != null) {
            Iterator<LogEntry<POS>> i = this.storage.boundedIterator();
            while (i.hasNext()) {
                LogEntry<POS> entry = i.next();
                if (checkpoint != null && entry.position.compareTo(checkpoint) <= 0) continue;
                this.modifyState(kvstore, entry, observableState);
                ++numRecordsProcessed;
            }
            this.updateCheckpoint(endOffsest);
        }
        log.debug("Bootstrap - completed: " + identifier + ", new checkpoint= " + this.getCheckpoint() + ", duration.ms = " + (EventTime.unix() - t));
        return numRecordsProcessed;
    }

    public <K> void tail(final MemStore kvstore, final Optional<ObservableState<K>> observableState) {
        switch (this.fsm) {
            case INIT: {
                throw new IllegalStateException("Cannot transition from init to tail - bootstrap is required first");
            }
            case WRITE: {
                throw new IllegalStateException("Cannot transition from write mode directly from write to boot mode");
            }
        }
        this.fsm = FSM.TAIL;
        this.logsync.compareAndSet(null, new LogSync(){

            @Override
            public void run() {
                try {
                    while (!this.isInterrupted()) {
                        Iterator entries = Log.this.storage.fetch(true);
                        if (entries == null) {
                            return;
                        }
                        while (entries.hasNext()) {
                            LogEntry entry = entries.next();
                            Log.this.modifyState(kvstore, entry, observableState);
                        }
                    }
                }
                catch (Throwable e) {
                    log.error("Failure in the LogSync Thread", e);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void close() throws IOException {
                Log.this.storage.cancel();
                try {
                    log.trace("cancelling storage fetch operation and waiting for the logsync thread to complete..");
                    3 var1_1 = this;
                    synchronized (var1_1) {
                        this.join();
                    }
                }
                catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }
        });
        this.logsync.get().start();
    }

    @Override
    public void close() throws IOException {
        try {
            try {
                if (this.fsm == FSM.TAIL) {
                    this.stopLogSync();
                }
            }
            finally {
                this.fsm = FSM.INIT;
                if (this.enabled) {
                    this.writeCheckpoint();
                }
            }
        }
        finally {
            this.stopped = true;
        }
    }

    @Override
    public void run() {
        try {
            this.stopped = false;
            while (!this.stopped) {
                TimeUnit.SECONDS.sleep(10L);
                if (!this.enabled || !this.checkpointModified) continue;
                this.writeCheckpoint();
            }
        }
        catch (Exception e) {
            log.error("Error in the manager thread", (Throwable)e);
            Thread.currentThread().getThreadGroup().interrupt();
        }
    }

    private <K> void modifyState(MemStore kvstore, LogEntry<POS> entry, Optional<ObservableState<K>> observableState) {
        if (this.storage.isTombstone(entry)) {
            kvstore.remove(ByteBuffer.wrap((byte[])entry.key));
            observableState.ifPresent(state -> state.internalPush((byte[])entry.key, Optional.empty()));
        } else {
            kvstore.put(ByteBuffer.wrap((byte[])entry.key), kvstore.wrap((byte[])entry.value, entry.timestamp));
            observableState.ifPresent(state -> state.internalPush((byte[])entry.key, Optional.of(entry.value)));
        }
        this.updateCheckpoint(entry.position);
    }

    private void flushWrites() {
        this.storage.flush();
    }

    private void stopLogSync() {
        LogSync sync = this.logsync.get();
        if (sync == null) {
            throw new IllegalStateException("Tail mode requires a running logsync thread");
        }
        this.logsync.compareAndSet(sync, null);
        try {
            sync.close();
        }
        catch (IOException e) {
            log.warn("could not close LogSync thread", (Throwable)e);
        }
    }

    private POS updateCheckpoint(POS position) {
        return (POS)((Comparable)this.checkpoint.updateAndGet(chk -> {
            if (log.isTraceEnabled()) {
                log.trace("updating checkpoint, offset: " + position);
            }
            if (chk == null || position.compareTo(chk) > 0) {
                if (this.enabled) {
                    this.checkpointModified = true;
                }
                return position;
            }
            return chk;
        }));
    }

    private void writeCheckpoint() throws IOException {
        Comparable position = (Comparable)this.checkpoint.get();
        log.debug("Writing checkpoint " + position + " to file: " + this.checkpointFile);
        ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(this.checkpointFile.toFile()));
        oos.writeObject(position);
        oos.close();
        this.checkpointModified = false;
    }

    private abstract class LogSync
    extends Thread
    implements Closeable {
        private LogSync() {
        }
    }

    private static enum FSM {
        INIT,
        BOOT,
        WRITE,
        TAIL;

    }
}

