/*
 * Decompiled with CFR 0.152.
 */
package net.scattersphere.job.stream;

import java.io.File;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import org.mapdb.Atomic;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamRegistry {
    private static final StreamRegistry instance = new StreamRegistry();
    private final Logger LOG = LoggerFactory.getLogger(StreamRegistry.class);
    private DB db;
    private static final String STATUS_KEY = "-status";
    private static final String SIZE_KEY = "-size";

    private StreamRegistry() {
        File file = new File("_scattersphereStore");
        try {
            this.db = DBMaker.newFileDB(file).make();
        }
        catch (Exception ex) {
            this.db = null;
            this.LOG.error("Unable to create a store for Scattersphere Streaming.", ex);
        }
    }

    public static StreamRegistry instance() {
        return instance;
    }

    public void openStream(String streamId) {
        Objects.requireNonNull(this.db);
        Objects.requireNonNull(streamId);
        Atomic.Boolean status = this.db.getAtomicBoolean(streamId + STATUS_KEY);
        status.set(true);
        this.LOG.info("Opened stream: ID={} status={}", (Object)streamId, (Object)status.get());
    }

    public BlockingQueue<byte[]> getStream(String streamId) {
        Objects.requireNonNull(this.db);
        Objects.requireNonNull(streamId);
        return this.db.getQueue(streamId);
    }

    public boolean exists(String streamId) {
        Objects.requireNonNull(this.db);
        Objects.requireNonNull(streamId);
        BlockingQueue queue = this.db.getQueue(streamId);
        return queue != null;
    }

    public boolean isClosed(String streamId) {
        Objects.requireNonNull(streamId);
        Atomic.Boolean status = this.db.getAtomicBoolean(streamId + STATUS_KEY);
        return !status.get();
    }

    public void closeStream(String streamId) {
        Objects.requireNonNull(this.db);
        Objects.requireNonNull(streamId);
        Atomic.Boolean status = this.db.getAtomicBoolean(streamId + STATUS_KEY);
        status.set(false);
        this.commit();
        this.LOG.info("Closed stream: ID={} status={}", (Object)streamId, (Object)status.get());
    }

    public String getStatus(String streamId) {
        Objects.requireNonNull(this.db);
        Objects.requireNonNull(streamId);
        BlockingQueue queue = this.db.getQueue(streamId);
        if (queue != null) {
            Atomic.Boolean status = this.db.getAtomicBoolean(streamId + STATUS_KEY);
            if (status.get()) {
                return "ACTIVE";
            }
            if (this.isEmpty(streamId)) {
                return "EMPTY";
            }
            return "INACTIVE";
        }
        return "NOT FOUND";
    }

    public void write(String streamId, byte[] data) {
        Objects.requireNonNull(streamId);
        Objects.requireNonNull(data);
        BlockingQueue<byte[]> queue = this.getStream(streamId);
        queue.add(data);
        Atomic.Integer counter = this.db.getAtomicInteger(streamId + SIZE_KEY);
        counter.incrementAndGet();
        this.commit();
    }

    public int getSize(String streamId) {
        Objects.requireNonNull(streamId);
        Atomic.Integer counter = this.db.getAtomicInteger(streamId + SIZE_KEY);
        return counter.get();
    }

    public boolean isEmpty(String streamId) {
        return this.getSize(streamId) == 0;
    }

    public void decrementStreamSize(String streamId) {
        Objects.requireNonNull(streamId);
        Atomic.Integer counter = this.db.getAtomicInteger(streamId + SIZE_KEY);
        if (counter.get() == 0) {
            return;
        }
        counter.decrementAndGet();
        this.commit();
    }

    public void commit() {
        this.db.commit();
    }
}

