/*
 * Decompiled with CFR 0.152.
 */
package io.growing.sdk.java.store.impl;

import io.growing.sdk.java.dto.GIOMessage;
import io.growing.sdk.java.exception.GIOSendBeRejectedException;
import io.growing.sdk.java.logger.GioLogger;
import io.growing.sdk.java.sender.FixThreadPoolSender;
import io.growing.sdk.java.sender.MessageSender;
import io.growing.sdk.java.store.StoreStrategyAbstract;
import io.growing.sdk.java.thread.GioThreadNamedFactory;
import io.growing.sdk.java.utils.ConfigUtils;
import io.growing.sdk.java.utils.ExecutorServiceUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class AbortPolicyStoreStrategy
extends StoreStrategyAbstract {
    private static final int THREADS = ConfigUtils.getIntValue("send.msg.thread", 3);
    private static final int LIMIT = ConfigUtils.getIntValue("msg.store.queue.size", 500);
    private static double loadfactor = ConfigUtils.getDoubleValue("msg.store.queue.load_factor", 0.5);
    private static final int SEND_INTERVAL = ConfigUtils.getIntValue("send.msg.interval", 100);
    protected static final int SPEED_THREAD_POOL_TIMEOUT = ConfigUtils.getIntValue("speed.thread_pool.timeout", 1000);
    private static final int SEND_MSG_BATCH_SIZE = 100;
    private static final AtomicBoolean queueWillFull = new AtomicBoolean(false);
    private static final ScheduledThreadPoolExecutor speedSendScheduler = new ScheduledThreadPoolExecutor(THREADS, new GioThreadNamedFactory("gio-speed-send-msg-schedule"));
    private static final ScheduledExecutorService sendMsgSchedule = new ScheduledThreadPoolExecutor(1, new GioThreadNamedFactory("gio-send-msg-schedule"));
    private static final Map<String, List<GIOMessage>> BATCH_MSG_MAP = new ConcurrentHashMap<String, List<GIOMessage>>();
    private static final MessageSender SENDER = new FixThreadPoolSender();

    private static void beforeFull() {
        if (loadfactor < 0.0 || loadfactor > 1.0) {
            GioLogger.error("msg.store.queue.load_factor cannot be less than 0 or greater than 1, use default value: 0.5");
            loadfactor = 0.5;
        }
        if ((double)messageBlockingQueue.size() > (double)LIMIT * loadfactor) {
            queueWillFull.compareAndSet(false, true);
            GioLogger.debug("msg queue is almost full");
        } else {
            queueWillFull.compareAndSet(true, false);
        }
        if (queueWillFull.get()) {
            speedSendScheduler.schedule(new SendRunnable(), (long)((1.0 - loadfactor) * (double)SEND_INTERVAL) / 10L, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void doPush(GIOMessage msg) {
        if (!messageBlockingQueue.offer(msg)) {
            throw new GIOSendBeRejectedException("push was rejected, because msg queue is full, suggest greater size for [msg.store.queue.size] or shorten the interval of [send.msg.interval]");
        }
        AbortPolicyStoreStrategy.beforeFull();
    }

    @Override
    public void awaitTerminationAfterShutdown() {
        ExecutorServiceUtils.awaitTerminationAfterShutdown(pushMsgThreadPool, PUSH_THREAD_POOL_TIMEOUT);
        if (!messageBlockingQueue.isEmpty()) {
            GioLogger.error("awaitTerminationAfterShutdown was executed, msg queue size: " + messageBlockingQueue.size() + " is not empty, will wait it " + AWAIT + "s");
            try {
                TimeUnit.SECONDS.sleep(AWAIT);
            }
            catch (InterruptedException e) {
                GioLogger.error(e.getLocalizedMessage());
            }
        }
        ExecutorServiceUtils.awaitTerminationAfterShutdown(sendMsgSchedule, SEND_THREAD_POOL_TIMEOUT);
        ExecutorServiceUtils.awaitTerminationAfterShutdown(speedSendScheduler, SPEED_THREAD_POOL_TIMEOUT);
        SENDER.awaitTermination(SENDER_THREAD_POOL_TIMEOUT);
    }

    @Override
    public void shutDownNow() {
        pushMsgThreadPool.shutdownNow();
        sendMsgSchedule.shutdownNow();
        speedSendScheduler.shutdownNow();
        SENDER.shutdownNow();
    }

    static {
        messageBlockingQueue = new ArrayBlockingQueue(LIMIT);
        sendMsgSchedule.scheduleWithFixedDelay(new SendRunnable(), SEND_INTERVAL, SEND_INTERVAL, TimeUnit.MILLISECONDS);
    }

    static class SendRunnable
    implements Runnable {
        SendRunnable() {
        }

        private int currentBatchMsgSize() {
            int size = 0;
            Collection values = BATCH_MSG_MAP.values();
            for (List msgList : values) {
                size += msgList.size();
            }
            return size;
        }

        @Override
        public void run() {
            while (!messageBlockingQueue.isEmpty() && this.currentBatchMsgSize() < 100) {
                List<GIOMessage> list;
                GIOMessage gioMessage = (GIOMessage)messageBlockingQueue.poll();
                if (gioMessage == null) continue;
                String projectKey = gioMessage.getProjectKey();
                if (BATCH_MSG_MAP.containsKey(projectKey)) {
                    list = (List)BATCH_MSG_MAP.get(projectKey);
                    list.add(gioMessage);
                    continue;
                }
                list = new ArrayList();
                list.add(gioMessage);
                BATCH_MSG_MAP.put(projectKey, list);
            }
            for (Map.Entry entry : BATCH_MSG_MAP.entrySet()) {
                if (entry.getValue() == null || ((List)entry.getValue()).isEmpty()) continue;
                SENDER.sendMsg((String)entry.getKey(), (List)BATCH_MSG_MAP.remove(entry.getKey()));
            }
        }
    }
}

