/*
 * Decompiled with CFR 0.152.
 */
package cn.wizzer.iot.mqtt.server.broker.internal;

import cn.wizzer.iot.mqtt.server.broker.config.BrokerProperties;
import cn.wizzer.iot.mqtt.server.common.session.SessionStore;
import cn.wizzer.iot.mqtt.server.store.message.MessageIdService;
import cn.wizzer.iot.mqtt.server.store.session.SessionStoreService;
import cn.wizzer.iot.mqtt.server.store.subscribe.SubscribeStoreService;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.List;
import java.util.Map;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@IocBean
public class InternalSendServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(InternalSendServer.class);
    @Inject
    private BrokerProperties brokerProperties;
    @Inject
    private SubscribeStoreService subscribeStoreService;
    @Inject
    private SessionStoreService sessionStoreService;
    @Inject
    private MessageIdService messageIdService;
    @Inject
    private ChannelGroup channelGroup;
    @Inject
    private Map<String, ChannelId> channelIdMap;

    public void sendPublishMessage(String clientId, String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) {
        List subscribeStores = this.subscribeStoreService.search(topic);
        subscribeStores.forEach(subscribeStore -> {
            if (!clientId.equals(subscribeStore.getClientId()) && this.sessionStoreService.containsKey(subscribeStore.getClientId())) {
                Channel channel;
                ChannelId channelId;
                SessionStore sessionStore;
                MqttPublishMessage publishMessage;
                MqttQoS respQoS;
                MqttQoS mqttQoS2 = respQoS = mqttQoS.value() > subscribeStore.getMqttQoS() ? MqttQoS.valueOf((int)subscribeStore.getMqttQoS()) : mqttQoS;
                if (respQoS == MqttQoS.AT_MOST_ONCE) {
                    Channel channel2;
                    MqttPublishMessage publishMessage2 = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), (Object)new MqttPublishVariableHeader(topic, 0), (Object)Unpooled.buffer().writeBytes(messageBytes));
                    LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", new Object[]{subscribeStore.getClientId(), topic, respQoS.value()});
                    SessionStore sessionStore2 = this.sessionStoreService.get(subscribeStore.getClientId());
                    ChannelId channelId2 = this.channelIdMap.get(sessionStore2.getBrokerId() + "_" + sessionStore2.getChannelId());
                    if (channelId2 != null && (channel2 = this.channelGroup.find(channelId2)) != null) {
                        channel2.writeAndFlush((Object)publishMessage2);
                    }
                }
                if (respQoS == MqttQoS.AT_LEAST_ONCE) {
                    int messageId = this.messageIdService.getNextMessageId();
                    publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), (Object)new MqttPublishVariableHeader(topic, messageId), (Object)Unpooled.buffer().writeBytes(messageBytes));
                    LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", new Object[]{subscribeStore.getClientId(), topic, respQoS.value(), messageId});
                    sessionStore = this.sessionStoreService.get(subscribeStore.getClientId());
                    channelId = this.channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId());
                    if (channelId != null && (channel = this.channelGroup.find(channelId)) != null) {
                        channel.writeAndFlush((Object)publishMessage);
                    }
                }
                if (respQoS == MqttQoS.EXACTLY_ONCE) {
                    int messageId = this.messageIdService.getNextMessageId();
                    publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBLISH, dup, respQoS, retain, 0), (Object)new MqttPublishVariableHeader(topic, messageId), (Object)Unpooled.buffer().writeBytes(messageBytes));
                    LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", new Object[]{subscribeStore.getClientId(), topic, respQoS.value(), messageId});
                    sessionStore = this.sessionStoreService.get(subscribeStore.getClientId());
                    channelId = this.channelIdMap.get(sessionStore.getBrokerId() + "_" + sessionStore.getChannelId());
                    if (channelId != null && (channel = this.channelGroup.find(channelId)) != null) {
                        channel.writeAndFlush((Object)publishMessage);
                    }
                }
            }
        });
    }
}

