/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.aggregation;

import io.datakernel.aggregation.AggregationChunkStorage;
import io.datakernel.aggregation.AggregationUtils;
import io.datakernel.aggregation.ChunkIdCodec;
import io.datakernel.aggregation.ot.AggregationStructure;
import io.datakernel.async.service.EventloopService;
import io.datakernel.async.util.LogUtils;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.codegen.DefiningClassLoader;
import io.datakernel.common.Initializable;
import io.datakernel.common.MemSize;
import io.datakernel.common.collection.CollectionUtils;
import io.datakernel.common.ref.RefInt;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.dsl.ChannelSupplierTransformer;
import io.datakernel.csp.process.ChannelByteChunker;
import io.datakernel.csp.process.ChannelLZ4Compressor;
import io.datakernel.csp.process.ChannelLZ4Decompressor;
import io.datakernel.datastream.StreamConsumer;
import io.datakernel.datastream.StreamSupplier;
import io.datakernel.datastream.StreamSupplierTransformer;
import io.datakernel.datastream.csp.ChannelDeserializer;
import io.datakernel.datastream.csp.ChannelSerializer;
import io.datakernel.datastream.stats.StreamStats;
import io.datakernel.datastream.stats.StreamStatsBasic;
import io.datakernel.datastream.stats.StreamStatsDetailed;
import io.datakernel.datastream.stats.StreamStatsSizeCounter;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.jmx.EventloopJmxMBeanEx;
import io.datakernel.eventloop.jmx.ExceptionStats;
import io.datakernel.eventloop.jmx.ValueStats;
import io.datakernel.eventloop.util.ReflectionUtils;
import io.datakernel.jmx.api.JmxAttribute;
import io.datakernel.jmx.api.JmxOperation;
import io.datakernel.ot.util.IdGenerator;
import io.datakernel.promise.Promise;
import io.datakernel.promise.Promises;
import io.datakernel.promise.jmx.PromiseStats;
import io.datakernel.remotefs.FileMetadata;
import io.datakernel.remotefs.FsClient;
import java.io.File;
import java.nio.file.attribute.FileTime;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RemoteFsChunkStorage<C>
implements AggregationChunkStorage<C>,
EventloopService,
Initializable<RemoteFsChunkStorage<C>>,
EventloopJmxMBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(RemoteFsChunkStorage.class);
    public static final MemSize DEFAULT_BUFFER_SIZE = MemSize.kilobytes((long)256L);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5L);
    public static final String DEFAULT_BACKUP_FOLDER_NAME = "backups";
    public static final String LOG = ".log";
    public static final String TEMP_LOG = ".temp";
    private final Eventloop eventloop;
    private final ChunkIdCodec<C> chunkIdCodec;
    private final IdGenerator<C> idGenerator;
    private final FsClient client;
    private String backupDir = "backups";
    private MemSize bufferSize = DEFAULT_BUFFER_SIZE;
    private final ValueStats chunksCount = ValueStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseIdGenerator = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseOpenR = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseOpenW = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseFinishChunks = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseList = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseBackup = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanup = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanupCheckRequiredChunks = PromiseStats.create((Duration)DEFAULT_SMOOTHING_WINDOW);
    private boolean detailed;
    private final StreamStatsDetailed<ByteBuf> readFile = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> readDecompress = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsBasic<?> readDeserialize = StreamStats.basic();
    private final StreamStatsDetailed<?> readDeserializeDetailed = StreamStats.detailed();
    private final StreamStatsBasic<?> writeSerialize = StreamStats.basic();
    private final StreamStatsDetailed<?> writeSerializeDetailed = StreamStats.detailed();
    private final StreamStatsDetailed<ByteBuf> writeCompress = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeChunker = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeFile = StreamStats.detailed((StreamStatsSizeCounter)StreamStatsSizeCounter.forByteBufs());
    private final ExceptionStats cleanupWarnings = ExceptionStats.create();
    private int cleanupPreservedFiles;
    private int cleanupDeletedFiles;
    private int cleanupDeletedFilesTotal;
    private int cleanupSkippedFiles;
    private int cleanupSkippedFilesTotal;
    private int finishChunks;

    private RemoteFsChunkStorage(Eventloop eventloop, ChunkIdCodec<C> chunkIdCodec, IdGenerator<C> idGenerator, FsClient client) {
        this.eventloop = eventloop;
        this.chunkIdCodec = chunkIdCodec;
        this.idGenerator = idGenerator;
        this.client = client;
    }

    public static <C> RemoteFsChunkStorage<C> create(Eventloop eventloop, ChunkIdCodec<C> chunkIdCodec, IdGenerator<C> idGenerator, FsClient client) {
        return new RemoteFsChunkStorage<C>(eventloop, chunkIdCodec, idGenerator, client);
    }

    public RemoteFsChunkStorage<C> withBufferSize(MemSize bufferSize) {
        this.bufferSize = bufferSize;
        return this;
    }

    public RemoteFsChunkStorage<C> withBackupPath(String backupDir) {
        this.backupDir = backupDir;
        return this;
    }

    private String getPath(C chunkId) {
        return this.toFileName(chunkId) + LOG;
    }

    private String getTempPath(C chunkId) {
        return this.toFileName(chunkId) + TEMP_LOG;
    }

    private String toFileName(C chunkId) {
        return this.chunkIdCodec.toFileName(chunkId);
    }

    private C fromFileName(String fileName) {
        return this.chunkIdCodec.fromFileName(fileName);
    }

    @Override
    public <T> Promise<StreamSupplier<T>> read(AggregationStructure aggregation, List<String> fields, Class<T> recordClass, C chunkId, DefiningClassLoader classLoader) {
        return this.client.download(this.getPath(chunkId)).whenComplete(this.promiseOpenR.recordStats()).map(supplier -> ((StreamSupplier)((StreamSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)supplier.transformWith(this.readFile)).transformWith((ChannelSupplierTransformer)ChannelLZ4Decompressor.create())).transformWith(this.readDecompress)).transformWith((ChannelSupplierTransformer)ChannelDeserializer.create(AggregationUtils.createBinarySerializer(aggregation, recordClass, aggregation.getKeys(), fields, classLoader)))).transformWith(this.detailed ? this.readDeserializeDetailed : this.readDeserialize)).withLateBinding());
    }

    @Override
    public <T> Promise<StreamConsumer<T>> write(AggregationStructure aggregation, List<String> fields, Class<T> recordClass, C chunkId, DefiningClassLoader classLoader) {
        return this.client.upload(this.getTempPath(chunkId)).whenComplete(this.promiseOpenW.recordStats()).map(consumer -> StreamConsumer.ofSupplier(supplier -> ((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((ChannelSupplier)((StreamSupplier)supplier.transformWith(this.detailed ? this.writeSerializeDetailed : this.writeSerialize)).transformWith((StreamSupplierTransformer)ChannelSerializer.create(AggregationUtils.createBinarySerializer(aggregation, recordClass, aggregation.getKeys(), fields, classLoader)).withInitialBufferSize(this.bufferSize))).transformWith(this.writeCompress)).transformWith((ChannelSupplierTransformer)ChannelLZ4Compressor.createFastCompressor())).transformWith(this.writeChunker)).transformWith((ChannelSupplierTransformer)ChannelByteChunker.create((MemSize)this.bufferSize.map(bytes -> bytes / 2L), (MemSize)this.bufferSize.map(bytes -> bytes * 2L)))).transformWith(this.writeFile)).streamTo(consumer)));
    }

    @Override
    public Promise<Void> finish(Set<C> chunkIds) {
        this.finishChunks = chunkIds.size();
        return Promises.all(chunkIds.stream().map(id -> this.client.move(this.getTempPath(id), this.getPath(id)).toTry())).whenComplete(this.promiseFinishChunks.recordStats());
    }

    public Promise<C> createId() {
        return this.idGenerator.createId().whenComplete(this.promiseIdGenerator.recordStats());
    }

    public Promise<Void> backup(String backupId, Set<C> chunkIds) {
        String tempBackupDir = this.backupDir + File.separator + backupId + "_tmp";
        return Promises.all(chunkIds.stream().map(chunkId -> this.client.copy(chunkId + LOG, tempBackupDir + File.separator + chunkId + LOG))).then($ -> this.client.moveDir(tempBackupDir, this.backupDir + File.separator + backupId)).whenComplete(this.promiseBackup.recordStats());
    }

    public Promise<Void> cleanup(Set<C> saveChunks) {
        return this.cleanup(saveChunks, null);
    }

    public Promise<Void> cleanup(Set<C> preserveChunks, @Nullable Instant instant) {
        long timestamp = instant != null ? instant.toEpochMilli() : -1L;
        RefInt skipped = new RefInt(0);
        RefInt deleted = new RefInt(0);
        return this.client.list("*.log").then(list -> Promises.all(list.stream().filter(file -> {
            C id;
            try {
                String filename = file.getName();
                id = this.fromFileName(filename.substring(0, filename.length() - LOG.length()));
            }
            catch (NumberFormatException e) {
                this.cleanupWarnings.recordException((Throwable)e);
                logger.warn("Invalid chunk filename: " + file);
                return false;
            }
            if (preserveChunks.contains(id)) {
                return false;
            }
            long fileTimestamp = file.getTimestamp();
            if (timestamp == -1L || fileTimestamp <= timestamp) {
                return true;
            }
            long difference = fileTimestamp - timestamp;
            assert (difference > 0L);
            logger.trace("File {} timestamp {} > {}", new Object[]{file, fileTimestamp, timestamp});
            skipped.inc();
            return false;
        }).map(file -> {
            if (logger.isTraceEnabled()) {
                FileTime lastModifiedTime = FileTime.fromMillis(file.getTimestamp());
                logger.trace("Delete file: {} with last modifiedTime: {}({} millis)", new Object[]{file.getName(), lastModifiedTime, lastModifiedTime.toMillis()});
            }
            deleted.inc();
            return this.client.delete(file.getName());
        })).whenResult($ -> {
            this.cleanupPreservedFiles = preserveChunks.size();
            this.cleanupDeletedFiles = deleted.get();
            this.cleanupDeletedFilesTotal += deleted.get();
            this.cleanupSkippedFiles = skipped.get();
            this.cleanupSkippedFilesTotal += skipped.get();
        })).whenComplete(this.promiseCleanup.recordStats());
    }

    public Promise<Set<Long>> list(Predicate<String> filter, Predicate<Long> lastModified) {
        return this.client.list("*.log").map(list -> list.stream().filter(file -> lastModified.test(file.getTimestamp())).map(FileMetadata::getName).filter(filter).map(name -> Long.parseLong(name.substring(0, name.length() - LOG.length()))).collect(Collectors.toSet())).whenComplete(this.promiseList.recordStats());
    }

    public Promise<Void> checkRequiredChunks(Set<C> requiredChunks) {
        return this.list(s -> true, timestamp -> true).whenResult(actualChunks -> this.chunksCount.recordValue(actualChunks.size())).then(actualChunks -> actualChunks.containsAll(requiredChunks) ? Promise.of((Object)null) : Promise.ofException((Throwable)new IllegalStateException("Missed chunks from storage: " + CollectionUtils.toLimitedString((Collection)CollectionUtils.difference((Set)requiredChunks, (Set)actualChunks), (int)100)))).whenComplete(this.promiseCleanupCheckRequiredChunks.recordStats()).whenComplete(LogUtils.toLogger((Logger)logger, (String)LogUtils.thisMethod(), (Object[])new Object[]{CollectionUtils.toLimitedString(requiredChunks, (int)6)}));
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @NotNull
    public Promise<Void> start() {
        return this.client.ping();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    @JmxAttribute
    public PromiseStats getPromiseIdGenerator() {
        return this.promiseIdGenerator;
    }

    @JmxAttribute
    public PromiseStats getPromiseFinishChunks() {
        return this.promiseFinishChunks;
    }

    @JmxAttribute
    public PromiseStats getPromiseBackup() {
        return this.promiseBackup;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanup() {
        return this.promiseCleanup;
    }

    @JmxAttribute
    public PromiseStats getPromiseList() {
        return this.promiseList;
    }

    @JmxAttribute
    public PromiseStats getPromiseOpenR() {
        return this.promiseOpenR;
    }

    @JmxAttribute
    public PromiseStats getPromiseOpenW() {
        return this.promiseOpenW;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadFile() {
        return this.readFile;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDecompress() {
        return this.readDecompress;
    }

    @JmxAttribute
    public StreamStatsBasic getReadDeserialize() {
        return this.readDeserialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDeserializeDetailed() {
        return this.readDeserializeDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getWriteSerialize() {
        return this.writeSerialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteSerializeDetailed() {
        return this.writeSerializeDetailed;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteCompress() {
        return this.writeCompress;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteChunker() {
        return this.writeChunker;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteFile() {
        return this.writeFile;
    }

    @JmxAttribute
    public int getFinishChunks() {
        return this.finishChunks;
    }

    @JmxAttribute
    public ExceptionStats getCleanupWarnings() {
        return this.cleanupWarnings;
    }

    @JmxAttribute
    public int getCleanupPreservedFiles() {
        return this.cleanupPreservedFiles;
    }

    @JmxAttribute
    public int getCleanupDeletedFiles() {
        return this.cleanupDeletedFiles;
    }

    @JmxAttribute
    public int getCleanupDeletedFilesTotal() {
        return this.cleanupDeletedFilesTotal;
    }

    @JmxAttribute
    public int getCleanupSkippedFiles() {
        return this.cleanupSkippedFiles;
    }

    @JmxAttribute
    public int getCleanupSkippedFilesTotal() {
        return this.cleanupSkippedFilesTotal;
    }

    @JmxAttribute
    public ValueStats getChunksCount() {
        return this.chunksCount;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupCheckRequiredChunks() {
        return this.promiseCleanupCheckRequiredChunks;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailed = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailed = false;
    }

    public void resetStats() {
        this.cleanupPreservedFiles = 0;
        this.cleanupDeletedFiles = 0;
        this.cleanupDeletedFilesTotal = 0;
        this.cleanupSkippedFiles = 0;
        this.cleanupSkippedFilesTotal = 0;
        ReflectionUtils.resetStats((Object)this);
    }
}

