/*
 * Decompiled with CFR 0.152.
 */
package io.amient.affinity.core.util;

import io.amient.affinity.core.storage.LogEntry;
import io.amient.affinity.core.storage.LogStorage;
import io.amient.affinity.core.storage.LogStorageConf;
import io.amient.affinity.core.storage.Record;
import io.amient.affinity.core.util.CompletedJavaFuture;
import io.amient.affinity.core.util.EventTime;
import io.amient.affinity.core.util.JavaPromise;
import io.amient.affinity.core.util.TimeRange;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class MemoryLogStorage
implements LogStorage<Long> {
    private AtomicLong logEndOffset = new AtomicLong(-1L);
    private AtomicLong fetchStopOffset = new AtomicLong(Long.MAX_VALUE);
    private ConcurrentLinkedQueue<WritePromise> unflushedWrites = new ConcurrentLinkedQueue();
    private ConcurrentHashMap<Long, LogEntry<Long>> internal = new ConcurrentHashMap();
    private Long position = 0L;
    private AtomicBoolean cancelled = new AtomicBoolean(false);

    public MemoryLogStorage(LogStorageConf conf) {
    }

    @Override
    public int getNumPartitions() {
        return 1;
    }

    @Override
    public void resume(TimeRange range) {
    }

    @Override
    public void reset(int partition, TimeRange range) {
        LogEntry<Long> b;
        this.position = 0L;
        while ((b = this.internal.get(this.position)) != null && b.timestamp < range.start) {
            this.position = this.position + 1L;
        }
        this.fetchStopOffset.set(this.logEndOffset.get());
        while ((b = this.internal.get(this.fetchStopOffset.get())) != null && b.timestamp >= range.end) {
            this.fetchStopOffset.decrementAndGet();
        }
    }

    @Override
    public Long reset(int partition, Long startPosition) {
        this.position = startPosition == null ? 0L : startPosition;
        long stopOffset = this.logEndOffset.get();
        this.fetchStopOffset.set(stopOffset);
        return stopOffset;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Iterator<LogEntry<Long>> fetch(boolean unbounded) throws InterruptedException {
        if (!unbounded && this.position > this.fetchStopOffset.get()) {
            return null;
        }
        while (this.position > this.logEndOffset.get()) {
            AtomicBoolean atomicBoolean = this.cancelled;
            synchronized (atomicBoolean) {
                this.cancelled.wait(100L);
                if (this.cancelled.get()) {
                    return null;
                }
            }
        }
        LogEntry<Long> result = this.internal.get(this.position);
        this.position = this.position + 1L;
        return Collections.singletonList(result).iterator();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancel() {
        this.cancelled.set(true);
        AtomicBoolean atomicBoolean = this.cancelled;
        synchronized (atomicBoolean) {
            this.cancelled.notifyAll();
        }
    }

    @Override
    public Future<Long> commit() {
        return new CompletedJavaFuture<Long>(() -> System.currentTimeMillis());
    }

    @Override
    public Future<Long> append(Record<byte[], byte[]> record) {
        WritePromise f = new WritePromise(record);
        this.unflushedWrites.add(f);
        return f;
    }

    @Override
    public Future<Long> delete(byte[] key) {
        return this.append(new Record<byte[], Object>(key, null, EventTime.unix()));
    }

    @Override
    public void flush() {
        while (!this.unflushedWrites.isEmpty()) {
            WritePromise write = this.unflushedWrites.poll();
            this.logEndOffset.updateAndGet(pos -> {
                long newPos = pos + 1L;
                LogEntry<Long> entry = new LogEntry<Long>(newPos, (byte[])write.record.key, (byte[])write.record.value, write.record.timestamp, write.record.value == null);
                this.internal.put(newPos, entry);
                write.success(newPos);
                return newPos;
            });
        }
    }

    @Override
    public void close() throws IOException {
    }

    @Override
    public String keySubject() {
        return null;
    }

    @Override
    public String valueSubject() {
        return null;
    }

    @Override
    public void ensureExists() {
    }

    @Override
    public void ensureCorrectConfiguration(long ttlMs, int numPartitions, boolean readonly) {
    }

    @Override
    public String getStats() {
        return "logEndOffset=" + this.logEndOffset.get() + "; fetchStopOffset=" + this.fetchStopOffset.get();
    }

    @Override
    public String getTopic() {
        return "default";
    }

    public final class WritePromise
    extends JavaPromise<Long> {
        public Record<byte[], byte[]> record;

        public WritePromise(Record<byte[], byte[]> record) {
            this.record = record;
        }
    }
}

