/*
 * Decompiled with CFR 0.152.
 */
package cn.wizzer.iot.mqtt.server.broker.protocol;

import cn.hutool.core.util.StrUtil;
import cn.wizzer.iot.mqtt.server.common.message.IMessageIdService;
import cn.wizzer.iot.mqtt.server.common.message.IRetainMessageStoreService;
import cn.wizzer.iot.mqtt.server.common.subscribe.ISubscribeStoreService;
import cn.wizzer.iot.mqtt.server.common.subscribe.SubscribeStore;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.AttributeKey;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Subscribe {
    private static final Logger LOGGER = LoggerFactory.getLogger(Subscribe.class);
    private ISubscribeStoreService subscribeStoreService;
    private IMessageIdService messageIdService;
    private IRetainMessageStoreService retainMessageStoreService;

    public Subscribe(ISubscribeStoreService subscribeStoreService, IMessageIdService messageIdService, IRetainMessageStoreService retainMessageStoreService) {
        this.subscribeStoreService = subscribeStoreService;
        this.messageIdService = messageIdService;
        this.retainMessageStoreService = retainMessageStoreService;
    }

    public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
        List topicSubscriptions = msg.payload().topicSubscriptions();
        if (this.validTopicFilter(topicSubscriptions)) {
            String clientId = (String)channel.attr(AttributeKey.valueOf((String)"clientId")).get();
            ArrayList mqttQoSList = new ArrayList();
            topicSubscriptions.forEach(topicSubscription -> {
                String topicFilter = topicSubscription.topicName();
                MqttQoS mqttQoS = topicSubscription.qualityOfService();
                SubscribeStore subscribeStore = new SubscribeStore(clientId, topicFilter, mqttQoS.value());
                this.subscribeStoreService.put(topicFilter, subscribeStore);
                mqttQoSList.add(mqttQoS.value());
                LOGGER.debug("SUBSCRIBE - clientId: {}, topFilter: {}, QoS: {}", new Object[]{clientId, topicFilter, mqttQoS.value()});
            });
            MqttSubAckMessage subAckMessage = (MqttSubAckMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object)MqttMessageIdVariableHeader.from((int)msg.variableHeader().messageId()), (Object)new MqttSubAckPayload(mqttQoSList));
            channel.writeAndFlush((Object)subAckMessage);
            topicSubscriptions.forEach(topicSubscription -> {
                String topicFilter = topicSubscription.topicName();
                MqttQoS mqttQoS = topicSubscription.qualityOfService();
                this.sendRetainMessage(channel, topicFilter, mqttQoS);
            });
        } else {
            channel.close();
        }
    }

    private boolean validTopicFilter(List<MqttTopicSubscription> topicSubscriptions) {
        for (MqttTopicSubscription topicSubscription : topicSubscriptions) {
            String topicFilter = topicSubscription.topicName();
            if (StrUtil.startWith((CharSequence)topicFilter, (char)'+') || StrUtil.endWith((CharSequence)topicFilter, (char)'/')) {
                return false;
            }
            if (StrUtil.contains((CharSequence)topicFilter, (char)'#') && StrUtil.count((CharSequence)topicFilter, (char)'#') > 1) {
                return false;
            }
            if (!StrUtil.contains((CharSequence)topicFilter, (char)'+') || StrUtil.count((CharSequence)topicFilter, (char)'+') == StrUtil.count((CharSequence)topicFilter, (CharSequence)"/+")) continue;
            return false;
        }
        return true;
    }

    private void sendRetainMessage(Channel channel, String topicFilter, MqttQoS mqttQoS) {
        List retainMessageStores = this.retainMessageStoreService.search(topicFilter);
        retainMessageStores.forEach(retainMessageStore -> {
            MqttPublishMessage publishMessage;
            MqttQoS respQoS;
            MqttQoS mqttQoS2 = respQoS = retainMessageStore.getMqttQoS() > mqttQoS.value() ? mqttQoS : MqttQoS.valueOf((int)retainMessageStore.getMqttQoS());
            if (respQoS == MqttQoS.AT_MOST_ONCE) {
                MqttPublishMessage publishMessage2 = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, false, respQoS, false, 0), (Object)new MqttPublishVariableHeader(retainMessageStore.getTopic(), 0), (Object)Unpooled.buffer().writeBytes(retainMessageStore.getMessageBytes()));
                LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", new Object[]{(String)channel.attr(AttributeKey.valueOf((String)"clientId")).get(), retainMessageStore.getTopic(), respQoS.value()});
                channel.writeAndFlush((Object)publishMessage2);
            }
            if (respQoS == MqttQoS.AT_LEAST_ONCE) {
                int messageId = this.messageIdService.getNextMessageId();
                publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, false, respQoS, false, 0), (Object)new MqttPublishVariableHeader(retainMessageStore.getTopic(), messageId), (Object)Unpooled.buffer().writeBytes(retainMessageStore.getMessageBytes()));
                LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", new Object[]{(String)channel.attr(AttributeKey.valueOf((String)"clientId")).get(), retainMessageStore.getTopic(), respQoS.value(), messageId});
                channel.writeAndFlush((Object)publishMessage);
            }
            if (respQoS == MqttQoS.EXACTLY_ONCE) {
                int messageId = this.messageIdService.getNextMessageId();
                publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, false, respQoS, false, 0), (Object)new MqttPublishVariableHeader(retainMessageStore.getTopic(), messageId), (Object)Unpooled.buffer().writeBytes(retainMessageStore.getMessageBytes()));
                LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", new Object[]{(String)channel.attr(AttributeKey.valueOf((String)"clientId")).get(), retainMessageStore.getTopic(), respQoS.value(), messageId});
                channel.writeAndFlush((Object)publishMessage);
            }
        });
    }
}

