/*
 * Decompiled with CFR 0.152.
 */
package io.aboutcode.stage.web.websocket;

import io.aboutcode.stage.subscription.SubscriptionManager;
import io.aboutcode.stage.util.Action;
import io.aboutcode.stage.util.Tuple2;
import io.aboutcode.stage.web.websocket.WebsocketIo;
import io.aboutcode.stage.web.websocket.standard.TypedWebsocketMessage;
import io.aboutcode.stage.web.websocket.standard.WebsocketClientSession;
import io.aboutcode.stage.web.websocket.standard.WebsocketContext;
import io.aboutcode.stage.web.websocket.standard.WebsocketDataHandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WebSocket
public final class DelegatingWebsocketHandler<MessageT extends TypedWebsocketMessage> {
    private static final Logger logger = LoggerFactory.getLogger(DelegatingWebsocketHandler.class);
    private final SubscriptionManager<Long, Consumer<MessageT>, Void> broadcastSubscriptionManager = SubscriptionManager.asynchronous();
    private final Map<Session, DefaultWebsocketClientSession> sessions = new HashMap<Session, DefaultWebsocketClientSession>();
    private final WebsocketIo<MessageT> io;
    private final Set<WebsocketDataHandler> handlers = new HashSet<WebsocketDataHandler>();

    public DelegatingWebsocketHandler(WebsocketIo<MessageT> io) {
        this.io = io;
    }

    private static Tuple2<Class, Session> createTopic(Session session, Class type) {
        return Tuple2.of((Object)type, (Object)session);
    }

    public void addandInitialize(WebsocketDataHandler<MessageT> handler) {
        if (this.handlers.add(handler)) {
            handler.initialize(new WebsocketContext<MessageT>(){

                @Override
                public void publishToSubscribedClients(String topic, MessageT message) {
                    DelegatingWebsocketHandler.this.broadcastSubscriptionManager.forTopic((Object)topic, (subscriber, handback, ctx) -> subscriber.accept(message));
                }

                @Override
                public void publishToAllClients(MessageT message) {
                    DelegatingWebsocketHandler.this.broadcastSubscriptionManager.forAll((subscriber, handback, ctx) -> subscriber.accept(message));
                }
            });
        }
    }

    @OnWebSocketMessage
    public void onMessage(Session session, String message) {
        try {
            TypedWebsocketMessage request = (TypedWebsocketMessage)this.io.deserialize(message);
            DefaultWebsocketClientSession clientSession = this.sessions.get(session);
            clientSession.send(DelegatingWebsocketHandler.createTopic(session, request.getClass()), (subscriber, handback, context) -> subscriber.accept(request));
        }
        catch (IOException e) {
            logger.error("Message '{}' not handled by endpoint: {}", new Object[]{message, e.getMessage(), e});
        }
    }

    @OnWebSocketConnect
    public void onConnect(Session session) {
        DefaultWebsocketClientSession clientSession = new DefaultWebsocketClientSession(session, () -> this.onDisconnect(session, 404, "Remote session terminated unexpectedly"));
        this.sessions.put(session, clientSession);
        this.handlers.forEach(websocketDataHandler -> websocketDataHandler.onConnected(clientSession));
    }

    @OnWebSocketClose
    public void onDisconnect(Session session, int status, String reason) {
        logger.debug("Disconnecting with status '{}' because: {}", (Object)status, (Object)reason);
        DefaultWebsocketClientSession clientSession = this.sessions.remove(session);
        clientSession.send((subscriber, handback, context) -> context.cancel());
        clientSession.unsubscribeClientFromAll();
        this.handlers.forEach(handler -> handler.onDisconnected(clientSession));
    }

    @OnWebSocketError
    public void onError(Session session, Throwable error) {
        logger.error("Error in websocket connection", error);
    }

    private class DefaultWebsocketClientSession
    implements WebsocketClientSession<MessageT>,
    WriteCallback {
        private final Map<String, Object> state = new HashMap<String, Object>();
        private final Map<Object, Long> broadcastTopicToHandle = new HashMap<Object, Long>();
        private final Session session;
        private final SubscriptionManager<Long, Consumer<MessageT>, Void> requestSubscriptionManager = SubscriptionManager.asynchronous();
        private final List<Long> requestSubscriptionHandles = new ArrayList<Long>();
        private Action cleanupAction;

        DefaultWebsocketClientSession(Session session, Action cleanupAction) {
            this.session = session;
            this.cleanupAction = cleanupAction;
        }

        void send(Object topic, SubscriptionManager.SubscriptionAction<Consumer<MessageT>, Void> action) {
            this.requestSubscriptionManager.forTopic(topic, action);
        }

        @Override
        void send(SubscriptionManager.SubscriptionAction<Consumer<MessageT>, Void> action) {
            this.requestSubscriptionManager.forAll(action);
        }

        @Override
        public void send(MessageT message) {
            if (this.session.isOpen()) {
                this.sendRequest(this.session, message);
            } else {
                logger.warn("Unresponsive client at '{}', closing connection", (Object)this.session.getRemoteAddress());
                this.cleanupAction.accept();
            }
        }

        @Override
        public void close(int statusCode, String reason) {
            this.session.close(statusCode, reason);
        }

        private void sendRequest(Session session, MessageT message) {
            try {
                session.getRemote().sendString(DelegatingWebsocketHandler.this.io.serialize(message), (WriteCallback)this);
            }
            catch (IOException e) {
                logger.error("Error serializing message '{}': {}", new Object[]{message, e.getMessage(), e});
            }
        }

        @Override
        public void subscribeClientToTopic(Object topic) {
            this.broadcastTopicToHandle.computeIfAbsent(topic, key -> (Long)DelegatingWebsocketHandler.this.broadcastSubscriptionManager.subscribe(key, null, this::send));
        }

        @Override
        public <RequestT extends MessageT, ResponseT extends MessageT> void registerMessageHandler(Class<RequestT> requestType, Function<RequestT, ResponseT> messageHandler) {
            this.requestSubscriptionHandles.add((Long)this.requestSubscriptionManager.subscribe((Object)DelegatingWebsocketHandler.createTopic(this.session, requestType), null, message -> this.processRequest(message, messageHandler)));
        }

        @Override
        public <RequestT extends MessageT> void registerMessageConsumer(Class<RequestT> requestType, Consumer<RequestT> messageConsumer) {
            this.registerMessageHandler(requestType, message -> {
                messageConsumer.accept(message);
                return null;
            });
        }

        private <RequestT extends MessageT, ResponseT extends MessageT> void processRequest(RequestT request, Function<RequestT, ResponseT> messageHandler) {
            try {
                TypedWebsocketMessage response = (TypedWebsocketMessage)messageHandler.apply(request);
                if (!Objects.isNull(response)) {
                    this.send(response);
                }
            }
            catch (Exception e) {
                logger.error("Error processing response for request '{}': {}", new Object[]{request, e.getMessage(), e});
            }
        }

        void unsubscribeClientFromAll() {
            this.broadcastTopicToHandle.forEach((key, value) -> DelegatingWebsocketHandler.this.broadcastSubscriptionManager.unsubscribe(value));
            this.requestSubscriptionHandles.forEach(arg_0 -> this.requestSubscriptionManager.unsubscribe(arg_0));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void addState(String identifier, Object value) {
            Map<String, Object> map = this.state;
            synchronized (map) {
                this.state.put(identifier, value);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Optional<Object> removeState(String identifier) {
            Map<String, Object> map = this.state;
            synchronized (map) {
                if (this.state.containsKey(identifier)) {
                    return Optional.ofNullable(this.state.remove(identifier));
                }
            }
            return Optional.empty();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Optional<Object> getState(String identifier) {
            Map<String, Object> map = this.state;
            synchronized (map) {
                if (this.state.containsKey(identifier)) {
                    return Optional.ofNullable(this.state.get(identifier));
                }
            }
            return Optional.empty();
        }

        public void writeFailed(Throwable error) {
            logger.error("Error in sending message to websocket connection: {}", (Object)error, (Object)error);
        }

        public void writeSuccess() {
        }

        @Override
        public Map<String, List<String>> headers() {
            return this.session.getUpgradeRequest().getHeaders();
        }

        @Override
        public List<String> headers(String name) {
            return this.session.getUpgradeRequest().getHeaders(name);
        }

        @Override
        public Optional<String> header(String name) {
            return Optional.ofNullable(this.session.getUpgradeRequest().getHeader(name));
        }
    }
}

