/*
 * Decompiled with CFR 0.152.
 */
package cn.wizzer.iot.mqtt.server.broker.server;

import cn.wizzer.iot.mqtt.server.broker.codec.MqttWebSocketCodec;
import cn.wizzer.iot.mqtt.server.broker.config.BrokerProperties;
import cn.wizzer.iot.mqtt.server.broker.handler.BrokerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.InputStream;
import java.security.KeyStore;
import java.util.HashMap;
import java.util.Map;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLEngine;
import org.nutz.boot.starter.ServerFace;
import org.nutz.ioc.Ioc;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.lang.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@IocBean
public class BrokerServer
implements ServerFace {
    private static final Logger LOGGER = LoggerFactory.getLogger(BrokerServer.class);
    @Inject
    private BrokerProperties brokerProperties;
    @Inject(value="refer:$ioc")
    private Ioc ioc;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private SslContext sslContext;
    private Channel channel;
    private Channel websocketChannel;
    private ChannelGroup channelGroup;
    private Map<String, ChannelId> channelIdMap;

    public void start() throws Exception {
        LOGGER.info("Initializing {} MQTT Broker ...", (Object)("[" + this.brokerProperties.getId() + "]"));
        this.channelGroup = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
        this.channelIdMap = new HashMap<String, ChannelId>();
        this.bossGroup = this.brokerProperties.getUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
        Object object = this.workerGroup = this.brokerProperties.getUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
        if (this.brokerProperties.getSslEnabled()) {
            KeyStore keyStore = KeyStore.getInstance("PKCS12");
            InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("keystore/server.pfx");
            keyStore.load(inputStream, this.brokerProperties.getSslPassword().toCharArray());
            KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
            kmf.init(keyStore, this.brokerProperties.getSslPassword().toCharArray());
            this.sslContext = SslContextBuilder.forServer((KeyManagerFactory)kmf).build();
        }
        this.mqttServer();
        if (this.brokerProperties.getWebsocketEnabled()) {
            this.websocketServer();
            LOGGER.info("MQTT Broker {} is up and running. Open Port: {} WebSocketPort: {}", new Object[]{"[" + this.brokerProperties.getId() + "]", this.brokerProperties.getPort(), this.brokerProperties.getWebsocketPort()});
        } else {
            LOGGER.info("MQTT Broker {} is up and running. Open Port: {} ", (Object)("[" + this.brokerProperties.getId() + "]"), (Object)this.brokerProperties.getPort());
        }
    }

    public void stop() {
        LOGGER.info("Shutdown {} MQTT Broker ...", (Object)("[" + this.brokerProperties.getId() + "]"));
        this.channelGroup = null;
        this.channelIdMap = null;
        this.bossGroup.shutdownGracefully();
        this.bossGroup = null;
        this.workerGroup.shutdownGracefully();
        this.workerGroup = null;
        this.channel.closeFuture().syncUninterruptibly();
        this.channel = null;
        this.websocketChannel.closeFuture().syncUninterruptibly();
        this.websocketChannel = null;
        LOGGER.info("MQTT Broker {} shutdown finish.", (Object)("[" + this.brokerProperties.getId() + "]"));
    }

    @IocBean(name="channelGroup")
    public ChannelGroup getChannels() {
        return this.channelGroup;
    }

    @IocBean(name="channelIdMap")
    public Map<String, ChannelId> getChannelIdMap() {
        return this.channelIdMap;
    }

    private void mqttServer() throws Exception {
        ServerBootstrap sb = new ServerBootstrap();
        ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)sb.group(this.bossGroup, this.workerGroup).channel(this.brokerProperties.getUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)).handler((ChannelHandler)new LoggingHandler(LogLevel.INFO))).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline channelPipeline = socketChannel.pipeline();
                channelPipeline.addFirst("idle", (ChannelHandler)new IdleStateHandler(0, 0, BrokerServer.this.brokerProperties.getKeepAlive()));
                if (BrokerServer.this.brokerProperties.getSslEnabled()) {
                    SSLEngine sslEngine = BrokerServer.this.sslContext.newEngine(socketChannel.alloc());
                    sslEngine.setUseClientMode(false);
                    sslEngine.setNeedClientAuth(false);
                    channelPipeline.addLast("ssl", (ChannelHandler)new SslHandler(sslEngine));
                }
                channelPipeline.addLast("decoder", (ChannelHandler)new MqttDecoder());
                channelPipeline.addLast("encoder", (ChannelHandler)MqttEncoder.INSTANCE);
                channelPipeline.addLast("broker", (ChannelHandler)BrokerServer.this.ioc.get(BrokerHandler.class));
            }
        }).option(ChannelOption.SO_BACKLOG, (Object)this.brokerProperties.getSoBacklog())).childOption(ChannelOption.SO_KEEPALIVE, (Object)this.brokerProperties.getSoKeepAlive());
        this.channel = Strings.isNotBlank((CharSequence)this.brokerProperties.getHost()) ? sb.bind(this.brokerProperties.getHost(), this.brokerProperties.getPort()).sync().channel() : sb.bind(this.brokerProperties.getPort()).sync().channel();
    }

    private void websocketServer() throws Exception {
        ServerBootstrap sb = new ServerBootstrap();
        ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)sb.group(this.bossGroup, this.workerGroup).channel(this.brokerProperties.getUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)).handler((ChannelHandler)new LoggingHandler(LogLevel.INFO))).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline channelPipeline = socketChannel.pipeline();
                channelPipeline.addFirst("idle", (ChannelHandler)new IdleStateHandler(0, 0, BrokerServer.this.brokerProperties.getKeepAlive()));
                if (BrokerServer.this.brokerProperties.getSslEnabled()) {
                    SSLEngine sslEngine = BrokerServer.this.sslContext.newEngine(socketChannel.alloc());
                    sslEngine.setUseClientMode(false);
                    sslEngine.setNeedClientAuth(false);
                    channelPipeline.addLast("ssl", (ChannelHandler)new SslHandler(sslEngine));
                }
                channelPipeline.addLast("http-codec", (ChannelHandler)new HttpServerCodec());
                channelPipeline.addLast("aggregator", (ChannelHandler)new HttpObjectAggregator(0x100000));
                channelPipeline.addLast("compressor ", (ChannelHandler)new HttpContentCompressor());
                channelPipeline.addLast("protocol", (ChannelHandler)new WebSocketServerProtocolHandler(BrokerServer.this.brokerProperties.getWebsocketPath(), "mqtt,mqttv3.1,mqttv3.1.1", true, 65536));
                channelPipeline.addLast("mqttWebSocket", (ChannelHandler)new MqttWebSocketCodec());
                channelPipeline.addLast("decoder", (ChannelHandler)new MqttDecoder());
                channelPipeline.addLast("encoder", (ChannelHandler)MqttEncoder.INSTANCE);
                channelPipeline.addLast("broker", (ChannelHandler)BrokerServer.this.ioc.get(BrokerHandler.class));
            }
        }).option(ChannelOption.SO_BACKLOG, (Object)this.brokerProperties.getSoBacklog())).childOption(ChannelOption.SO_KEEPALIVE, (Object)this.brokerProperties.getSoKeepAlive());
        this.websocketChannel = Strings.isNotBlank((CharSequence)this.brokerProperties.getHost()) ? sb.bind(this.brokerProperties.getHost(), this.brokerProperties.getWebsocketPort()).sync().channel() : sb.bind(this.brokerProperties.getWebsocketPort()).sync().channel();
    }
}

