/*
 * Decompiled with CFR 0.152.
 */
package cn.wizzer.iot.mqtt.server.broker.protocol;

import cn.wizzer.iot.mqtt.server.broker.internal.InternalCommunication;
import cn.wizzer.iot.mqtt.server.broker.internal.InternalMessage;
import cn.wizzer.iot.mqtt.server.common.message.DupPublishMessageStore;
import cn.wizzer.iot.mqtt.server.common.message.IDupPublishMessageStoreService;
import cn.wizzer.iot.mqtt.server.common.message.IMessageIdService;
import cn.wizzer.iot.mqtt.server.common.message.IRetainMessageStoreService;
import cn.wizzer.iot.mqtt.server.common.message.RetainMessageStore;
import cn.wizzer.iot.mqtt.server.common.session.ISessionStoreService;
import cn.wizzer.iot.mqtt.server.common.session.SessionStore;
import cn.wizzer.iot.mqtt.server.common.subscribe.ISubscribeStoreService;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.AttributeKey;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Publish {
    private static final Logger LOGGER = LoggerFactory.getLogger(Publish.class);
    private ISessionStoreService sessionStoreService;
    private ISubscribeStoreService subscribeStoreService;
    private IMessageIdService messageIdService;
    private IRetainMessageStoreService retainMessageStoreService;
    private IDupPublishMessageStoreService dupPublishMessageStoreService;
    private InternalCommunication internalCommunication;
    private ChannelGroup channelGroup;
    private Map<String, ChannelId> channelIdMap;

    public Publish(ISessionStoreService sessionStoreService, ISubscribeStoreService subscribeStoreService, IMessageIdService messageIdService, IRetainMessageStoreService retainMessageStoreService, IDupPublishMessageStoreService dupPublishMessageStoreService, InternalCommunication internalCommunication, ChannelGroup channelGroup, Map<String, ChannelId> channelIdMap) {
        this.sessionStoreService = sessionStoreService;
        this.subscribeStoreService = subscribeStoreService;
        this.messageIdService = messageIdService;
        this.retainMessageStoreService = retainMessageStoreService;
        this.dupPublishMessageStoreService = dupPublishMessageStoreService;
        this.internalCommunication = internalCommunication;
        this.channelGroup = channelGroup;
        this.channelIdMap = channelIdMap;
    }

    public void processPublish(Channel channel, MqttPublishMessage msg) {
        InternalMessage internalMessage;
        byte[] messageBytes;
        String clientId = (String)channel.attr(AttributeKey.valueOf((String)"clientId")).get();
        if (msg.fixedHeader().qosLevel() == MqttQoS.AT_MOST_ONCE) {
            messageBytes = new byte[msg.payload().readableBytes()];
            msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
            internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName()).setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes).setDup(false).setRetain(false).setClientId(clientId);
            this.internalCommunication.internalSend(internalMessage);
            this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
        }
        if (msg.fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
            messageBytes = new byte[msg.payload().readableBytes()];
            msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
            internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName()).setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes).setDup(false).setRetain(false).setClientId(clientId);
            this.internalCommunication.internalSend(internalMessage);
            this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
            this.sendPubAckMessage(channel, msg.variableHeader().packetId());
        }
        if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
            messageBytes = new byte[msg.payload().readableBytes()];
            msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
            internalMessage = new InternalMessage().setTopic(msg.variableHeader().topicName()).setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes).setDup(false).setRetain(false).setClientId(clientId);
            this.internalCommunication.internalSend(internalMessage);
            this.sendPublishMessage(msg.variableHeader().topicName(), msg.fixedHeader().qosLevel(), messageBytes, false, false);
            this.sendPubRecMessage(channel, msg.variableHeader().packetId());
        }
        if (msg.fixedHeader().isRetain()) {
            messageBytes = new byte[msg.payload().readableBytes()];
            msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
            if (messageBytes.length == 0) {
                this.retainMessageStoreService.remove(msg.variableHeader().topicName());
            } else {
                RetainMessageStore retainMessageStore = new RetainMessageStore().setTopic(msg.variableHeader().topicName()).setMqttQoS(msg.fixedHeader().qosLevel().value()).setMessageBytes(messageBytes);
                this.retainMessageStoreService.put(msg.variableHeader().topicName(), retainMessageStore);
            }
        }
    }

    private void sendPublishMessage(String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) {
        List subscribeStores = this.subscribeStoreService.search(topic);
        subscribeStores.forEach(subscribeStore -> {
            if (this.sessionStoreService.containsKey(subscribeStore.getClientId())) {
                Channel channel;
                ChannelId channelId;
                SessionStore sessionStore;
                DupPublishMessageStore dupPublishMessageStore;
                MqttPublishMessage publishMessage;
                MqttQoS respQoS;
                MqttQoS mqttQoS2 = respQoS = mqttQoS.value() > subscribeStore.getMqttQoS() ? MqttQoS.valueOf((int)subscribeStore.getMqttQoS()) : mqttQoS;
                if (respQoS == MqttQoS.AT_MOST_ONCE) {
                    Channel channel2;
                    MqttPublishMessage publishMessage2 = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), (Object)new MqttPublishVariableHeader(topic, 0), (Object)Unpooled.buffer().writeBytes(messageBytes));
                    LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", new Object[]{subscribeStore.getClientId(), topic, respQoS.value()});
                    SessionStore sessionStore2 = this.sessionStoreService.get(subscribeStore.getClientId());
                    ChannelId channelId2 = this.channelIdMap.get(sessionStore2.getBrokerId() + "_" + sessionStore2.getChannelId());
                    if (channelId2 != null && (channel2 = this.channelGroup.find(channelId2)) != null) {
                        channel2.writeAndFlush((Object)publishMessage2);
                    }
                }
                if (respQoS == MqttQoS.AT_LEAST_ONCE) {
                    int messageId = this.messageIdService.getNextMessageId();
                    publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), (Object)new MqttPublishVariableHeader(topic, messageId), (Object)Unpooled.buffer().writeBytes(messageBytes));
                    LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", new Object[]{subscribeStore.getClientId(), topic, respQoS.value(), messageId});
                    dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId()).setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId);
                    this.dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore);
                    sessionStore = this.sessionStoreService.get(subscribeStore.getClientId());
                    channelId = this.channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId());
                    if (channelId != null && (channel = this.channelGroup.find(channelId)) != null) {
                        channel.writeAndFlush((Object)publishMessage);
                    }
                }
                if (respQoS == MqttQoS.EXACTLY_ONCE) {
                    int messageId = this.messageIdService.getNextMessageId();
                    publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), (Object)new MqttPublishVariableHeader(topic, messageId), (Object)Unpooled.buffer().writeBytes(messageBytes));
                    LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", new Object[]{subscribeStore.getClientId(), topic, respQoS.value(), messageId});
                    dupPublishMessageStore = new DupPublishMessageStore().setClientId(subscribeStore.getClientId()).setTopic(topic).setMqttQoS(respQoS.value()).setMessageBytes(messageBytes).setMessageId(messageId);
                    this.dupPublishMessageStoreService.put(subscribeStore.getClientId(), dupPublishMessageStore);
                    sessionStore = this.sessionStoreService.get(subscribeStore.getClientId());
                    channelId = this.channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId());
                    if (channelId != null && (channel = this.channelGroup.find(channelId)) != null) {
                        channel.writeAndFlush((Object)publishMessage);
                    }
                }
            }
        });
    }

    private void sendPubAckMessage(Channel channel, int messageId) {
        MqttPubAckMessage pubAckMessage = (MqttPubAckMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object)MqttMessageIdVariableHeader.from((int)messageId), null);
        channel.writeAndFlush((Object)pubAckMessage);
    }

    private void sendPubRecMessage(Channel channel, int messageId) {
        MqttMessage pubRecMessage = MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object)MqttMessageIdVariableHeader.from((int)messageId), null);
        channel.writeAndFlush((Object)pubRecMessage);
    }
}

