/*
 * Decompiled with CFR 0.152.
 */
package io.amient.affinity.core.storage;

import io.amient.affinity.core.config.CfgCls;
import io.amient.affinity.core.config.CfgPath;
import io.amient.affinity.core.config.CfgStruct;
import io.amient.affinity.core.storage.Checkpoint;
import io.amient.affinity.core.storage.CloseableIterator;
import io.amient.affinity.core.storage.StateConf;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MemStore
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(MemStore.class);
    private final MemStoreManager manager;
    private final boolean checkpointsEnable = this.isPersistent();
    protected final int ttlSecs;
    protected final Path dataDir;

    public MemStore(String identifier, StateConf conf) throws IOException {
        this.ttlSecs = (Integer)conf.TtlSeconds.apply();
        if (!this.checkpointsEnable) {
            this.dataDir = null;
        } else {
            if (!conf.MemStore.DataDir.isDefined()) {
                throw new IllegalArgumentException(conf.MemStore.DataDir.path() + " must be provided via affinity.node config for store: " + identifier);
            }
            this.dataDir = ((Path)conf.MemStore.DataDir.apply()).resolve(Paths.get(identifier, new String[0]));
            if (!Files.exists(this.dataDir, new LinkOption[0])) {
                Files.createDirectories(this.dataDir, new FileAttribute[0]);
            }
        }
        this.manager = new MemStoreManager();
    }

    protected abstract boolean isPersistent();

    public final void open() {
        this.manager.start();
    }

    public Checkpoint getCheckpoint() {
        return (Checkpoint)this.manager.checkpoint.get();
    }

    public abstract CloseableIterator<Map.Entry<ByteBuffer, ByteBuffer>> iterator();

    public abstract Optional<ByteBuffer> apply(ByteBuffer var1);

    public abstract long numKeys();

    public final Checkpoint put(ByteBuffer key, ByteBuffer value, long offset) {
        this.putImpl(key, value);
        return this.manager.updateCheckpoint(offset);
    }

    protected abstract void putImpl(ByteBuffer var1, ByteBuffer var2);

    public final Checkpoint remove(ByteBuffer key, long offset) {
        this.removeImpl(key);
        return this.manager.updateCheckpoint(offset);
    }

    protected abstract void removeImpl(ByteBuffer var1);

    @Override
    public void close() {
        try {
            this.manager.close();
        }
        catch (IOException e) {
            log.error("Failed to write final checkpoint", (Throwable)e);
        }
        this.manager.stopped = true;
    }

    public final ByteBuffer wrap(byte[] value, long timestamp) {
        ByteBuffer memStoreValue = ByteBuffer.allocate(8 + value.length);
        memStoreValue.order(ByteOrder.BIG_ENDIAN);
        memStoreValue.putLong(timestamp);
        memStoreValue.put(value);
        memStoreValue.flip();
        return memStoreValue;
    }

    public final Optional<byte[]> unwrap(ByteBuffer key, ByteBuffer valueAndMetadata, long ttlMs) {
        if (ttlMs > 0L && valueAndMetadata.getLong(0) + ttlMs < System.currentTimeMillis()) {
            this.removeImpl(key);
            return Optional.empty();
        }
        int len = valueAndMetadata.limit();
        byte[] result = new byte[len - 8];
        valueAndMetadata.position(8);
        valueAndMetadata.get(result);
        return Optional.of(result);
    }

    public final void unload(byte[] key, long offset) {
        this.remove(ByteBuffer.wrap(key), offset);
    }

    public final void load(byte[] key, byte[] value, long offset, long timestamp) {
        ByteBuffer valueBuffer = this.wrap(value, timestamp);
        this.put(ByteBuffer.wrap(key), valueBuffer, offset);
    }

    private class MemStoreManager
    extends Thread
    implements Closeable {
        private final AtomicReference<Checkpoint> checkpoint = new AtomicReference<Checkpoint>(new Checkpoint(-1L));
        private volatile boolean checkpointModified = false;
        private volatile boolean stopped = true;

        private MemStoreManager() {
        }

        private Path getFile() {
            return MemStore.this.dataDir.resolve(MemStore.this.getClass().getSimpleName() + ".checkpoint");
        }

        @Override
        public synchronized void start() {
            if (MemStore.this.checkpointsEnable) {
                try {
                    Path file = this.getFile();
                    if (Files.exists(file, new LinkOption[0])) {
                        this.checkpoint.set(Checkpoint.readFromFile(file));
                    }
                    log.info("Initialized " + this.checkpoint + " from " + file);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            super.start();
        }

        @Override
        public void run() {
            try {
                this.stopped = false;
                while (!this.stopped) {
                    Thread.sleep(10000L);
                    if (!MemStore.this.checkpointsEnable || !this.checkpointModified) continue;
                    this.writeCheckpoint();
                }
            }
            catch (Exception e) {
                log.error("Error in the manager thread", (Throwable)e);
                Thread.currentThread().getThreadGroup().interrupt();
            }
        }

        private void writeCheckpoint() throws IOException {
            Path file = this.getFile();
            Checkpoint chk = this.checkpoint.get();
            log.debug("Writing checkpoint " + chk + " to file: " + file);
            chk.writeToFile(file);
            this.checkpointModified = false;
        }

        private Checkpoint updateCheckpoint(long offset) {
            return this.checkpoint.updateAndGet(chk -> {
                if (log.isTraceEnabled()) {
                    log.trace("updating checkpoint, offset: " + offset);
                }
                if (offset > chk.offset) {
                    if (MemStore.this.checkpointsEnable) {
                        this.checkpointModified = true;
                    }
                    return new Checkpoint(offset);
                }
                return chk;
            });
        }

        @Override
        public void close() throws IOException {
            if (MemStore.this.checkpointsEnable) {
                this.writeCheckpoint();
            }
        }
    }

    public static class MemStoreConf
    extends CfgStruct<MemStoreConf> {
        public CfgCls<MemStore> Class = this.cls("class", MemStore.class, true);
        public CfgPath DataDir = this.filepath("data.dir", false);

        @Override
        protected Set<String> specializations() {
            return new HashSet<String>(Arrays.asList("mapdb", "rocksdb"));
        }
    }
}

