/*
 * Decompiled with CFR 0.152.
 */
package cn.wizzer.iot.mqtt.server.broker.protocol;

import cn.hutool.core.util.StrUtil;
import cn.wizzer.iot.mqtt.server.broker.config.BrokerProperties;
import cn.wizzer.iot.mqtt.server.common.auth.IAuthService;
import cn.wizzer.iot.mqtt.server.common.message.IDupPubRelMessageStoreService;
import cn.wizzer.iot.mqtt.server.common.message.IDupPublishMessageStoreService;
import cn.wizzer.iot.mqtt.server.common.session.ISessionStoreService;
import cn.wizzer.iot.mqtt.server.common.session.SessionStore;
import cn.wizzer.iot.mqtt.server.common.subscribe.ISubscribeStoreService;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttIdentifierRejectedException;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttUnacceptableProtocolVersionException;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Connect {
    private static final Logger LOGGER = LoggerFactory.getLogger(Connect.class);
    private ISessionStoreService sessionStoreService;
    private ISubscribeStoreService subscribeStoreService;
    private IDupPublishMessageStoreService dupPublishMessageStoreService;
    private IDupPubRelMessageStoreService dupPubRelMessageStoreService;
    private IAuthService authService;
    private BrokerProperties brokerProperties;
    private ChannelGroup channelGroup;
    private Map<String, ChannelId> channelIdMap;

    public Connect(ISessionStoreService sessionStoreService, ISubscribeStoreService subscribeStoreService, IDupPublishMessageStoreService dupPublishMessageStoreService, IDupPubRelMessageStoreService dupPubRelMessageStoreService, IAuthService authService, BrokerProperties brokerProperties, ChannelGroup channelGroup, Map<String, ChannelId> channelIdMap) {
        this.sessionStoreService = sessionStoreService;
        this.subscribeStoreService = subscribeStoreService;
        this.dupPublishMessageStoreService = dupPublishMessageStoreService;
        this.dupPubRelMessageStoreService = dupPubRelMessageStoreService;
        this.authService = authService;
        this.brokerProperties = brokerProperties;
        this.channelGroup = channelGroup;
        this.channelIdMap = channelIdMap;
    }

    public void processConnect(Channel channel, MqttConnectMessage msg) {
        if (msg.decoderResult().isFailure()) {
            Throwable cause = msg.decoderResult().cause();
            if (cause instanceof MqttUnacceptableProtocolVersionException) {
                MqttConnAckMessage connAckMessage = (MqttConnAckMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object)new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null);
                channel.writeAndFlush((Object)connAckMessage);
                channel.close();
                return;
            }
            if (cause instanceof MqttIdentifierRejectedException) {
                MqttConnAckMessage connAckMessage = (MqttConnAckMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object)new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
                channel.writeAndFlush((Object)connAckMessage);
                channel.close();
                return;
            }
            channel.close();
            return;
        }
        if (StrUtil.isBlank((CharSequence)msg.payload().clientIdentifier())) {
            MqttConnAckMessage connAckMessage = (MqttConnAckMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object)new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null);
            channel.writeAndFlush((Object)connAckMessage);
            channel.close();
            return;
        }
        if (this.brokerProperties.getMqttPasswordMust()) {
            String password;
            String username = msg.payload().userName();
            String string = password = msg.payload().passwordInBytes() == null ? null : new String(msg.payload().passwordInBytes(), CharsetUtil.UTF_8);
            if (!this.authService.checkValid(username, password)) {
                MqttConnAckMessage connAckMessage = (MqttConnAckMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object)new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null);
                channel.writeAndFlush((Object)connAckMessage);
                channel.close();
                return;
            }
        }
        if (this.sessionStoreService.containsKey(msg.payload().clientIdentifier())) {
            SessionStore sessionStore = this.sessionStoreService.get(msg.payload().clientIdentifier());
            boolean cleanSession = sessionStore.isCleanSession();
            if (cleanSession) {
                this.sessionStoreService.remove(msg.payload().clientIdentifier());
                this.subscribeStoreService.removeForClient(msg.payload().clientIdentifier());
                this.dupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
                this.dupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
            }
            try {
                Channel previous;
                ChannelId channelId = this.channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId());
                if (channelId != null && (previous = this.channelGroup.find(channelId)) != null) {
                    previous.close();
                }
            }
            catch (Exception channelId) {}
        } else {
            this.subscribeStoreService.removeForClient(msg.payload().clientIdentifier());
            this.dupPublishMessageStoreService.removeByClient(msg.payload().clientIdentifier());
            this.dupPubRelMessageStoreService.removeByClient(msg.payload().clientIdentifier());
        }
        int expire = 0;
        if (msg.variableHeader().keepAliveTimeSeconds() > 0) {
            if (channel.pipeline().names().contains("idle")) {
                channel.pipeline().remove("idle");
            }
            expire = Math.round((float)msg.variableHeader().keepAliveTimeSeconds() * 1.5f);
            channel.pipeline().addFirst("idle", (ChannelHandler)new IdleStateHandler(0, 0, expire));
        }
        SessionStore sessionStore = new SessionStore(this.brokerProperties.getId(), msg.payload().clientIdentifier(), channel.id().asLongText(), msg.variableHeader().isCleanSession(), null, expire);
        if (msg.variableHeader().isWillFlag()) {
            MqttPublishMessage willMessage = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf((int)msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0), (Object)new MqttPublishVariableHeader(msg.payload().willTopic(), 0), (Object)Unpooled.buffer().writeBytes(msg.payload().willMessageInBytes()));
            sessionStore.setWillMessage(willMessage);
        }
        this.sessionStoreService.put(msg.payload().clientIdentifier(), sessionStore, expire);
        channel.attr(AttributeKey.valueOf((String)"clientId")).set((Object)msg.payload().clientIdentifier());
        boolean sessionPresent = this.sessionStoreService.containsKey(msg.payload().clientIdentifier()) && !msg.variableHeader().isCleanSession();
        MqttConnAckMessage okResp = (MqttConnAckMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object)new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent), null);
        channel.writeAndFlush((Object)okResp);
        LOGGER.debug("CONNECT - clientId: {}, cleanSession: {}", (Object)msg.payload().clientIdentifier(), (Object)msg.variableHeader().isCleanSession());
        if (!msg.variableHeader().isCleanSession()) {
            List dupPublishMessageStoreList = this.dupPublishMessageStoreService.get(msg.payload().clientIdentifier());
            List dupPubRelMessageStoreList = this.dupPubRelMessageStoreService.get(msg.payload().clientIdentifier());
            dupPublishMessageStoreList.forEach(dupPublishMessageStore -> {
                MqttPublishMessage publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.valueOf((int)dupPublishMessageStore.getMqttQoS()), false, 0), (Object)new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(), dupPublishMessageStore.getMessageId()), (Object)Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes()));
                channel.writeAndFlush((Object)publishMessage);
            });
            dupPubRelMessageStoreList.forEach(dupPubRelMessageStore -> {
                MqttMessage pubRelMessage = MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_MOST_ONCE, false, 0), (Object)MqttMessageIdVariableHeader.from((int)dupPubRelMessageStore.getMessageId()), null);
                channel.writeAndFlush((Object)pubRelMessage);
            });
        }
    }
}

