/*
 * Decompiled with CFR 0.152.
 */
package io.amient.affinity.core.storage;

import io.amient.affinity.core.serde.AbstractSerde;
import io.amient.affinity.core.storage.Log;
import io.amient.affinity.core.storage.MemStore;
import io.amient.affinity.core.storage.ObservableState;
import io.amient.affinity.core.storage.Record;
import io.amient.affinity.core.util.CloseableIterator;
import io.amient.affinity.core.util.CompletedJavaFuture;
import io.amient.affinity.core.util.EventTime;
import io.amient.affinity.core.util.MappedJavaFuture;
import io.amient.affinity.core.util.TimeRange;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

public class State_Java_Refactor<K, V>
extends ObservableState<K>
implements Closeable {
    private final String identifier;
    private final MemStore kvstore;
    private final Optional<Log<?>> logOption;
    private final int partition;
    private final Class<K> keyClass;
    private final AbstractSerde<K> keySerde;
    private final AbstractSerde<V> valueSerde;
    private final long ttlMs;
    private final int lockTimeoutMs;
    private final boolean external;
    private ConcurrentHashMap<K, Long> locks = new ConcurrentHashMap();

    public State_Java_Refactor(String identifier, MemStore kvstore, Optional<Log<?>> logOption, int partition, Class<K> keyClass, AbstractSerde<K> keySerde, AbstractSerde<V> valueSerde, long ttlMs, int lockTimeoutMs, boolean external) {
        this.identifier = identifier;
        this.kvstore = kvstore;
        this.logOption = logOption;
        this.partition = partition;
        this.keyClass = keyClass;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        this.ttlMs = ttlMs;
        this.lockTimeoutMs = lockTimeoutMs;
        this.external = external;
    }

    void boot() {
        this.logOption.ifPresent(log -> log.bootstrap(this.identifier, this.kvstore, this.partition, this.external ? Optional.of(this) : Optional.empty()));
    }

    void tail() {
        this.logOption.ifPresent(log -> log.tail(this.kvstore, Optional.of(this)));
    }

    @Override
    public void close() throws IOException {
        try {
            if (this.logOption.isPresent()) {
                this.logOption.get().close();
            }
        }
        finally {
            this.kvstore.close();
        }
    }

    public Long numKeys() {
        return this.kvstore.numKeys();
    }

    public CloseableIterator<Record<K, V>> iterator() {
        return this.iterator(TimeRange.UNBOUNDED, new Object[0]);
    }

    public CloseableIterator<Record<K, V>> iterator(final TimeRange range, Object ... prefix) {
        final ByteBuffer bytePrefix = prefix.length == 0 ? null : ByteBuffer.wrap(this.keySerde.prefix(this.keyClass, prefix));
        return new CloseableIterator<Record<K, V>>(){
            CloseableIterator<Map.Entry<ByteBuffer, ByteBuffer>> underlying;
            private Record<K, V> current;
            {
                this.underlying = State_Java_Refactor.this.kvstore.iterator(bytePrefix);
                this.current = null;
            }

            @Override
            public boolean hasNext() {
                if (this.current != null) {
                    return true;
                }
                while (this.underlying.hasNext()) {
                    Map.Entry entry = (Map.Entry)this.underlying.next();
                    Optional<Record<byte[], byte[]>> value = State_Java_Refactor.this.kvstore.unwrap((ByteBuffer)entry.getKey(), (ByteBuffer)entry.getValue(), State_Java_Refactor.this.ttlMs);
                    if (!value.isPresent()) continue;
                    Record<byte[], byte[]> record = value.get();
                    if (!range.contains(record.timestamp)) continue;
                    Object key = State_Java_Refactor.this.keySerde.fromBytes((byte[])record.key);
                    this.current = new Record(key, State_Java_Refactor.this.valueSerde.fromBytes((byte[])record.value), record.timestamp);
                    return true;
                }
                return false;
            }

            @Override
            public Record<K, V> next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException();
                }
                Record result = this.current;
                this.current = null;
                return result;
            }

            @Override
            public void close() throws IOException {
                this.underlying.close();
            }
        };
    }

    Optional<V> apply(K key) {
        return this.apply(ByteBuffer.wrap(this.keySerde.toBytes(key)));
    }

    private Optional<V> apply(ByteBuffer key) {
        return this.kvstore.apply(key).flatMap(cell -> this.kvstore.unwrap(key, (ByteBuffer)cell, this.ttlMs).map(byteRecord -> this.valueSerde.fromBytes((byte[])byteRecord.value)));
    }

    public Map<K, V> range(TimeRange range, Object prefix1, Object ... prefixN) throws IOException {
        LinkedHashMap result = new LinkedHashMap();
        try (CloseableIterator<Record<K, V>> it = this.iterator(range, prefix1, prefixN);){
            while (it.hasNext()) {
                Record record = (Record)it.next();
                result.put(record.key, record.value);
            }
        }
        return result;
    }

    public Future<Optional<V>> replace(final K key, final V value) {
        return new MappedJavaFuture<Optional<V>, Optional<V>>(this.put(this.keySerde.toBytes(key), value)){

            @Override
            public Optional<V> map(Optional<V> result) {
                State_Java_Refactor.this.push(key, value);
                return result;
            }
        };
    }

    public Future<Optional<V>> delete(final K key) {
        return new MappedJavaFuture<Optional<V>, Optional<V>>(this.delete(this.keySerde.toBytes(key))){

            @Override
            public Optional<V> map(Optional<V> result) {
                State_Java_Refactor.this.push(key, null);
                return result;
            }
        };
    }

    public Future<Optional<V>> update(K key, V value) {
        return this.getAndUpdate(key, current -> Optional.of(value));
    }

    public Future<Optional<V>> remove(K key) {
        return this.getAndUpdate(key, current -> Optional.empty());
    }

    public Future<V> insert(K key, V value) {
        return new MappedJavaFuture<Optional<V>, V>(this.updateAndGet(key, current -> {
            if (current.isPresent()) {
                throw new IllegalArgumentException(key + " already exists in state store");
            }
            return Optional.of(value);
        })){

            @Override
            public V map(Optional<V> result) {
                return result.get();
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Optional<V> get(K key) throws TimeoutException, InterruptedException {
        Long l = this.lock(key);
        try {
            Optional<V> optional = this.apply(key);
            return optional;
        }
        finally {
            this.unlock(key, l);
        }
    }

    public Future<Optional<V>> getAndUpdate(final K key, Function<Optional<V>, Optional<V>> f) {
        byte[] k = this.keySerde.toBytes(key);
        final Long l = this.lock(key);
        try {
            final Optional currentValue = this.apply(ByteBuffer.wrap(k));
            final Optional<V> updatedValue = f.apply(currentValue);
            if (currentValue.equals(updatedValue)) {
                this.unlock(key, l);
                return new CompletedJavaFuture<Optional<V>>(() -> currentValue);
            }
            return new MappedJavaFuture(updatedValue.isPresent() ? this.put(k, updatedValue.get()) : this.delete(k)){

                public Optional<V> map(Object position) {
                    State_Java_Refactor.this.unlock(key, l);
                    State_Java_Refactor.this.push(key, updatedValue);
                    return currentValue;
                }

                public Boolean recover(Throwable e) throws Throwable {
                    State_Java_Refactor.this.unlock(key, l);
                    throw e;
                }
            };
        }
        catch (Throwable e) {
            try {
                this.unlock(key, l);
                throw e;
            }
            catch (Throwable e2) {
                return new CompletedJavaFuture<Optional<V>>(() -> {
                    throw new RuntimeException(e2);
                });
            }
        }
    }

    public Future<Optional<V>> updateAndGet(final K key, Function<Optional<V>, Optional<V>> f) {
        byte[] k = this.keySerde.toBytes(key);
        final Long l = this.lock(key);
        try {
            Optional currentValue = this.apply(ByteBuffer.wrap(k));
            final Optional<V> updatedValue = f.apply(currentValue);
            if (currentValue.equals(updatedValue)) {
                this.unlock(key, l);
                return new CompletedJavaFuture<Optional<V>>(() -> currentValue);
            }
            return new MappedJavaFuture(updatedValue.isPresent() ? this.put(k, updatedValue.get()) : this.delete(k)){

                public Optional<V> map(Object position) {
                    State_Java_Refactor.this.unlock(key, l);
                    State_Java_Refactor.this.push(key, updatedValue);
                    return updatedValue;
                }

                public Boolean recover(Throwable e) throws Throwable {
                    State_Java_Refactor.this.unlock(key, l);
                    throw e;
                }
            };
        }
        catch (Throwable e) {
            try {
                this.unlock(key, l);
                throw e;
            }
            catch (Throwable e2) {
                return new CompletedJavaFuture<Optional<V>>(() -> {
                    throw new RuntimeException(e2);
                });
            }
        }
    }

    private Future<Optional<V>> put(byte[] key, final V value) {
        long recordTimestamp;
        if (this.external) {
            throw new IllegalStateException("put() called on a read-only state");
        }
        long nowMs = System.currentTimeMillis();
        long l = recordTimestamp = value instanceof EventTime ? ((EventTime)value).eventTimeUnix() : nowMs;
        if (this.ttlMs > 0L && recordTimestamp + this.ttlMs < nowMs) {
            return this.delete(key);
        }
        byte[] valueBytes = this.valueSerde.toBytes(value);
        if (this.logOption.isPresent()) {
            return new MappedJavaFuture(this.logOption.get().append(this.kvstore, key, valueBytes, recordTimestamp)){

                public Optional<V> map(Object result) {
                    return Optional.of(value);
                }
            };
        }
        this.kvstore.put(ByteBuffer.wrap(key), this.kvstore.wrap(valueBytes, recordTimestamp));
        return new CompletedJavaFuture<Optional<V>>(Optional.of(value));
    }

    private Future<Optional<V>> delete(byte[] key) {
        if (this.external) {
            throw new IllegalStateException("delete() called on a read-only state");
        }
        if (this.logOption.isPresent()) {
            return new MappedJavaFuture(this.logOption.get().delete(this.kvstore, key)){

                public Optional<V> map(Object result) {
                    return Optional.empty();
                }
            };
        }
        this.kvstore.remove(ByteBuffer.wrap(key));
        return new CompletedJavaFuture<Optional<V>>(Optional.empty());
    }

    @Override
    public void internalPush(byte[] key, Optional<byte[]> value) {
        if (value.isPresent()) {
            this.push(this.keySerde.fromBytes(key), Optional.of(this.valueSerde.fromBytes(value.get())));
        } else {
            this.push(this.keySerde.fromBytes(key), Optional.empty());
        }
    }

    private void unlock(K key, Long l) {
        if (!this.locks.remove(key, l)) {
            throw new IllegalMonitorStateException(key + " is locked by another Thread");
        }
    }

    private Long lock(K key) throws TimeoutException, InterruptedException {
        long l = Thread.currentThread().getId();
        int counter = 0;
        long start = System.currentTimeMillis();
        while (this.locks.putIfAbsent(key, l) != null) {
            long sleepTime;
            if ((sleepTime = Math.round(Math.log(++counter))) <= 0L) continue;
            if (System.currentTimeMillis() - start > (long)this.lockTimeoutMs) {
                throw new TimeoutException("Could not acquire lock for " + key + " in " + this.lockTimeoutMs + " ms");
            }
            Thread.sleep(sleepTime);
        }
        return l;
    }
}

