/*
 * Decompiled with CFR 0.152.
 */
package net.scattersphere.server.handler.stream;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import net.scattersphere.data.DataSerializer;
import net.scattersphere.job.stream.StreamRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.net.NetSocket;

public class SubscribedStreamWriter
implements Runnable {
    private final NetSocket endpoint;
    private final String streamId;
    private final BlockingQueue<byte[]> stream;
    private final AtomicBoolean isRunning;
    private final Logger LOG = LoggerFactory.getLogger(SubscribedStreamWriter.class);

    public SubscribedStreamWriter(NetSocket endpoint, String streamId) {
        this.endpoint = endpoint;
        this.streamId = streamId;
        this.isRunning = new AtomicBoolean(false);
        this.stream = StreamRegistry.instance().getStream(streamId);
    }

    @Override
    public void run() {
        this.isRunning.set(true);
        while (this.isRunning.get()) {
            byte[] buffer = null;
            if (this.endpoint == null) {
                this.LOG.info("Stream {} endpoint is null", (Object)this.streamId);
                break;
            }
            try {
                buffer = this.stream.take();
                StreamRegistry.instance().decrementStreamSize(this.streamId);
            }
            catch (InterruptedException ex) {
                this.LOG.info("Stream {} interrupted, closing connection to {}", (Object)this.streamId, (Object)this.endpoint.toString());
            }
            catch (NullPointerException ex) {
                this.LOG.info("Stream {} empty, closing connection to {}", (Object)this.streamId, (Object)this.endpoint.toString());
                this.endpoint.close();
                break;
            }
            Buffer outBuffer = new Buffer(DataSerializer.packetize(buffer));
            this.LOG.info("Writing packet: destination={} length={} status={} size={}", this.endpoint.toString(), buffer.length, StreamRegistry.instance().isClosed(this.streamId), StreamRegistry.instance().getSize(this.streamId));
            this.endpoint.write(outBuffer);
            if (!StreamRegistry.instance().isClosed(this.streamId) || !StreamRegistry.instance().isEmpty(this.streamId)) continue;
            this.endpoint.close();
            this.LOG.info("Stream closed to {}", (Object)this.endpoint.toString());
            break;
        }
        if (this.isRunning.get()) {
            this.LOG.info("Thread shutdown normally.");
        } else {
            this.LOG.info("Thread shutdown due to disconnect or error.");
        }
    }

    public void stop() {
        this.isRunning.set(false);
    }
}

