/*
 * Decompiled with CFR 0.152.
 */
package net.scattersphere.server.handler.stream.message;

import java.util.HashMap;
import java.util.Map;
import net.scattersphere.data.message.JobMessage;
import net.scattersphere.data.message.JobParametersMessage;
import net.scattersphere.server.ClientConnection;
import net.scattersphere.server.handler.core.MessageHandler;
import net.scattersphere.server.handler.stream.SubscribedStreamWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vertx.java.core.Handler;
import org.vertx.java.core.net.NetSocket;

public class StreamMessageHandler
implements MessageHandler {
    private static final String MESSAGE = "STREAM";
    private static final Map<String, NetSocket> clientStreamMap = new HashMap<String, NetSocket>();
    private static final Map<String, SubscribedStreamWriter> writerMap = new HashMap<String, SubscribedStreamWriter>();
    private final Logger LOG = LoggerFactory.getLogger(StreamMessageHandler.class);

    @Override
    public void handle(JobMessage message, ClientConnection client) {
        JobParametersMessage parameters = JobParametersMessage.fromByteArray(message.getPayload());
        String streamId = parameters.getJobName();
        String streamCommand = parameters.getJobMessage();
        this.LOG.info("Stream: ID={} Command={}", (Object)streamId, (Object)streamCommand);
        Object responseString = null;
        switch (streamCommand.toLowerCase()) {
            case "open": {
                this.startStream(streamId, client.getEndpoint());
                break;
            }
            case "close": {
                this.stopStream(streamId);
            }
        }
    }

    private void startStream(final String streamId, final NetSocket endpoint) {
        this.LOG.info("Starting stream: id={} endpoint={}", (Object)streamId, (Object)endpoint.toString());
        clientStreamMap.put(streamId, endpoint);
        String clientKey = endpoint.toString();
        SubscribedStreamWriter writer = new SubscribedStreamWriter(endpoint, streamId);
        writerMap.put(clientKey, writer);
        new Thread(writer).start();
        endpoint.closeHandler(new Handler<Void>(){

            @Override
            public void handle(Void aVoid) {
                String clientKey = endpoint.toString();
                StreamMessageHandler.this.LOG.info("Stopping stream: id={} endpoint={}", (Object)streamId, (Object)endpoint);
                endpoint.close();
                SubscribedStreamWriter writer = (SubscribedStreamWriter)writerMap.get(clientKey);
                if (writer != null) {
                    writer.stop();
                }
            }
        });
    }

    private void stopStream(String streamId) {
        SubscribedStreamWriter writer;
        NetSocket socket = clientStreamMap.remove(streamId);
        String clientKey = socket.toString();
        this.LOG.info("Stopping stream: id={} endpoint={}", (Object)streamId, (Object)socket);
        if (socket != null) {
            socket.close();
        }
        if ((writer = writerMap.get(clientKey)) != null) {
            writer.stop();
        }
    }

    @Override
    public boolean canHandle(JobMessage message) {
        return message != null && message.getMessage().equals(MESSAGE);
    }
}

