/*
 * Decompiled with CFR 0.152.
 */
package cn.wizzer.iot.mqtt.server.broker.handler;

import cn.wizzer.iot.mqtt.server.broker.config.BrokerProperties;
import cn.wizzer.iot.mqtt.server.broker.protocol.ProtocolProcess;
import cn.wizzer.iot.mqtt.server.common.session.SessionStore;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttIdentifierRejectedException;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnacceptableProtocolVersionException;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import java.io.IOException;
import java.util.Map;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;

@IocBean
@ChannelHandler.Sharable
public class BrokerHandler
extends SimpleChannelInboundHandler<MqttMessage> {
    @Inject
    private ProtocolProcess protocolProcess;
    @Inject
    private BrokerProperties brokerProperties;
    @Inject
    private ChannelGroup channelGroup;
    @Inject
    private Map<String, ChannelId> channelIdMap;

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.channelGroup.add((Object)ctx.channel());
        this.channelIdMap.put(this.brokerProperties.getId() + "_" + ctx.channel().id().asLongText(), ctx.channel().id());
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        this.channelGroup.remove((Object)ctx.channel());
        this.channelIdMap.remove(this.brokerProperties.getId() + "_" + ctx.channel().id().asLongText());
    }

    protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
        if (msg.decoderResult().isFailure()) {
            Throwable cause = msg.decoderResult().cause();
            if (cause instanceof MqttUnacceptableProtocolVersionException) {
                ctx.writeAndFlush((Object)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object)new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null));
            } else if (cause instanceof MqttIdentifierRejectedException) {
                ctx.writeAndFlush((Object)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object)new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null));
            }
            ctx.close();
            return;
        }
        switch (msg.fixedHeader().messageType()) {
            case CONNECT: {
                this.protocolProcess.connect().processConnect(ctx.channel(), (MqttConnectMessage)msg);
                break;
            }
            case CONNACK: {
                break;
            }
            case PUBLISH: {
                this.protocolProcess.publish().processPublish(ctx.channel(), (MqttPublishMessage)msg);
                break;
            }
            case PUBACK: {
                this.protocolProcess.pubAck().processPubAck(ctx.channel(), (MqttMessageIdVariableHeader)msg.variableHeader());
                break;
            }
            case PUBREC: {
                this.protocolProcess.pubRec().processPubRec(ctx.channel(), (MqttMessageIdVariableHeader)msg.variableHeader());
                break;
            }
            case PUBREL: {
                this.protocolProcess.pubRel().processPubRel(ctx.channel(), (MqttMessageIdVariableHeader)msg.variableHeader());
                break;
            }
            case PUBCOMP: {
                this.protocolProcess.pubComp().processPubComp(ctx.channel(), (MqttMessageIdVariableHeader)msg.variableHeader());
                break;
            }
            case SUBSCRIBE: {
                this.protocolProcess.subscribe().processSubscribe(ctx.channel(), (MqttSubscribeMessage)msg);
                break;
            }
            case SUBACK: {
                break;
            }
            case UNSUBSCRIBE: {
                this.protocolProcess.unSubscribe().processUnSubscribe(ctx.channel(), (MqttUnsubscribeMessage)msg);
                break;
            }
            case UNSUBACK: {
                break;
            }
            case PINGREQ: {
                this.protocolProcess.pingReq().processPingReq(ctx.channel(), msg);
                break;
            }
            case PINGRESP: {
                break;
            }
            case DISCONNECT: {
                this.protocolProcess.disConnect().processDisConnect(ctx.channel(), msg);
                break;
            }
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            ctx.close();
        } else {
            super.exceptionCaught(ctx, cause);
        }
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
            if (idleStateEvent.state() == IdleState.ALL_IDLE) {
                SessionStore sessionStore;
                Channel channel = ctx.channel();
                String clientId = (String)channel.attr(AttributeKey.valueOf((String)"clientId")).get();
                if (this.protocolProcess.getSessionStoreService().containsKey(clientId) && (sessionStore = this.protocolProcess.getSessionStoreService().get(clientId)).getWillMessage() != null) {
                    this.protocolProcess.publish().processPublish(ctx.channel(), sessionStore.getWillMessage());
                }
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

