/*
 * Decompiled with CFR 0.152.
 */
package net.seninp.jmotif.sax.parallel;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import net.seninp.jmotif.sax.NumerosityReductionStrategy;
import net.seninp.jmotif.sax.SAXException;
import net.seninp.jmotif.sax.SAXProcessor;
import net.seninp.jmotif.sax.alphabet.NormalAlphabet;
import net.seninp.jmotif.sax.datastructure.SAXRecord;
import net.seninp.jmotif.sax.datastructure.SAXRecords;
import net.seninp.jmotif.sax.parallel.SAXWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelSAXImplementation {
    static final int COMPLETED_FLAG = -1;
    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSAXImplementation.class);
    private ExecutorService executorService;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SAXRecords process(double[] timeseries, int threadsNum, int slidingWindowSize, int paaSize, int alphabetSize, NumerosityReductionStrategy numRedStrategy, double normalizationThreshold) throws SAXException {
        LOGGER.debug("Starting the parallel SAX");
        NormalAlphabet na = new NormalAlphabet();
        SAXProcessor sp = new SAXProcessor();
        SAXRecords res = new SAXRecords(0L);
        this.executorService = Executors.newFixedThreadPool(threadsNum);
        LOGGER.debug("Created thread pool of {} threads", (Object)threadsNum);
        NumerosityReductionStrategy nrStrategy = NumerosityReductionStrategy.fromValue(numRedStrategy.index());
        if (NumerosityReductionStrategy.MINDIST.equals((Object)nrStrategy)) {
            nrStrategy = NumerosityReductionStrategy.NONE;
        }
        ExecutorCompletionService<HashMap<Integer, char[]>> completionService = new ExecutorCompletionService<HashMap<Integer, char[]>>(this.executorService);
        int totalTaskCounter = 0;
        long tstamp = System.currentTimeMillis();
        int evenIncrement = timeseries.length / threadsNum;
        if (evenIncrement <= slidingWindowSize) {
            LOGGER.warn("Unable to run with {} threads. Rolling back to single-threaded implementation.", (Object)threadsNum);
            return sp.ts2saxViaWindow(timeseries, slidingWindowSize, paaSize, na.getCuts(alphabetSize), nrStrategy, normalizationThreshold);
        }
        int reminder = timeseries.length % threadsNum;
        int firstChunkSize = evenIncrement + reminder;
        LOGGER.debug("data size {}, evenIncrement {}, reminder {}, firstChunkSize {}", new Object[]{timeseries.length, evenIncrement, reminder, firstChunkSize});
        int firstChunkStart = 0;
        int firstChunkEnd = firstChunkSize - 1 + slidingWindowSize;
        SAXWorker job0 = new SAXWorker(tstamp + (long)totalTaskCounter, timeseries, firstChunkStart, firstChunkEnd, slidingWindowSize, paaSize, alphabetSize, nrStrategy, normalizationThreshold);
        completionService.submit(job0);
        LOGGER.debug("submitted first chunk job {}", (Object)tstamp);
        ++totalTaskCounter;
        while (totalTaskCounter < threadsNum - 1) {
            int intermediateChunkStart = firstChunkSize - 1 + (totalTaskCounter - 1) * evenIncrement + 1;
            int intermediateChunkEnd = firstChunkSize - 1 + totalTaskCounter * evenIncrement + slidingWindowSize;
            SAXWorker job = new SAXWorker(tstamp + (long)totalTaskCounter, timeseries, intermediateChunkStart, intermediateChunkEnd, slidingWindowSize, paaSize, alphabetSize, nrStrategy, normalizationThreshold);
            completionService.submit(job);
            LOGGER.debug("submitted intermediate chunk job {}", (Object)(tstamp + (long)totalTaskCounter));
            ++totalTaskCounter;
        }
        int lastChunkStart = timeseries.length - evenIncrement;
        int lastChunkEnd2 = timeseries.length;
        SAXWorker jobN = new SAXWorker(tstamp + (long)totalTaskCounter, timeseries, lastChunkStart, lastChunkEnd2, slidingWindowSize, paaSize, alphabetSize, nrStrategy, normalizationThreshold);
        completionService.submit(jobN);
        LOGGER.debug("submitted last chunk job {}", (Object)(tstamp + (long)totalTaskCounter));
        ++totalTaskCounter;
        this.executorService.shutdown();
        int[] completedChunks = new int[threadsNum];
        try {
            while (totalTaskCounter > 0) {
                if (Thread.currentThread().isInterrupted()) {
                    LOGGER.info("Parallel SAX being interrupted, returning NULL!");
                    SAXRecords lastChunkEnd2 = null;
                    return lastChunkEnd2;
                }
                Future finished = completionService.poll(24L, TimeUnit.HOURS);
                if (null == finished) {
                    LOGGER.info("Breaking POLL loop after 24 HOURS of waiting...");
                    break;
                }
                HashMap chunkRes = (HashMap)finished.get();
                int idx = (int)(Long.parseLong(String.valueOf((char[])chunkRes.get(-1))) - tstamp);
                LOGGER.debug("job with stamp {} of chunk {} has finished", chunkRes.get(-1), (Object)idx);
                LOGGER.debug("current completion status: {} completion flag: {}", (Object)Arrays.toString(completedChunks), (Object)-1);
                chunkRes.remove(-1);
                if (0 == res.size() || nrStrategy.equals((Object)NumerosityReductionStrategy.NONE)) {
                    res.addAll(chunkRes);
                    completedChunks[idx] = -1;
                    if (nrStrategy.equals((Object)NumerosityReductionStrategy.NONE)) {
                        LOGGER.debug("merged in as is because the NR strategy is NONE");
                    } else {
                        LOGGER.debug("merged in as is because the result id empty");
                    }
                } else {
                    String headStr;
                    int chunkHeadIndex;
                    String tailStr;
                    LOGGER.debug("processing chunk {}; res has results already...", (Object)idx);
                    if (0 == idx) {
                        completedChunks[0] = -1;
                        if (completedChunks[1] == -1) {
                            LOGGER.debug("this is the very first chunk, merging the tail only");
                            int chunkTailIndex = (Integer)Collections.max(chunkRes.keySet());
                            tailStr = String.valueOf((char[])chunkRes.get(chunkTailIndex));
                            int resultHeadIndex = res.getMinIndex();
                            SAXRecord resultHead = res.getByIndex(resultHeadIndex);
                            String headStr2 = String.valueOf(resultHead.getPayload());
                            LOGGER.debug("first index in the res {} for {}, last index in head {} for {}", new Object[]{resultHeadIndex, headStr2, chunkTailIndex, headStr2});
                            if (nrStrategy.equals((Object)NumerosityReductionStrategy.EXACT) && headStr2.equalsIgnoreCase(tailStr)) {
                                LOGGER.debug("res head {} at {} is dropped in favor of head tail {} at {}", new Object[]{headStr2, resultHeadIndex, tailStr, chunkTailIndex});
                                res.dropByIndex(resultHeadIndex);
                            }
                        } else {
                            LOGGER.debug("this is the very first chunk, but second is not yet in the results, merging all in");
                        }
                        res.addAll(chunkRes);
                    } else if (threadsNum - 1 == idx) {
                        completedChunks[idx] = -1;
                        if (completedChunks[idx - 1] == -1) {
                            LOGGER.debug("this is the very last chunk, merging the head only");
                            chunkHeadIndex = (Integer)Collections.min(chunkRes.keySet());
                            headStr = String.valueOf((char[])chunkRes.get(chunkHeadIndex));
                            int resultTailIndex = res.getMaxIndex();
                            SAXRecord resTail = res.getByIndex(resultTailIndex);
                            String resStr = String.valueOf(resTail.getPayload());
                            LOGGER.debug("last index in the res {} for {}, first index in the tail {} for {}", new Object[]{resultTailIndex, resStr, chunkHeadIndex, headStr});
                            if (nrStrategy.equals((Object)NumerosityReductionStrategy.EXACT) && resStr.equalsIgnoreCase(headStr)) {
                                LOGGER.debug("chunk head {} at {} is dropped in favor of res tail {} at {}", new Object[]{headStr, chunkHeadIndex, resStr, resultTailIndex});
                                chunkRes.remove(chunkHeadIndex);
                            }
                        } else {
                            LOGGER.debug("this is the very last chunk, but previous is not yet in the results, merging all in");
                        }
                        res.addAll(chunkRes);
                    } else {
                        int tmpIdx;
                        completedChunks[idx] = -1;
                        LOGGER.debug("processing chunk {}", (Object)idx);
                        if (completedChunks[idx - 1] == -1) {
                            LOGGER.debug("previous chunk was completed, merging in");
                            chunkHeadIndex = (Integer)Collections.min(chunkRes.keySet());
                            headStr = String.valueOf((char[])chunkRes.get(chunkHeadIndex));
                            tmpIdx = chunkHeadIndex;
                            while (null == res.getByIndex(tmpIdx)) {
                                --tmpIdx;
                            }
                            int resultTailIndex = tmpIdx;
                            SAXRecord resTail = res.getByIndex(resultTailIndex);
                            String resStr = String.valueOf(resTail.getPayload());
                            LOGGER.debug("last index in the res {} for {}, first index in the chunk {} for {}", new Object[]{resultTailIndex, resStr, chunkHeadIndex, headStr});
                            if (nrStrategy.equals((Object)NumerosityReductionStrategy.EXACT) && resStr.equalsIgnoreCase(headStr)) {
                                LOGGER.debug("chunk head {} at {} is dropped in favor of res tail {} at {}", new Object[]{headStr, chunkHeadIndex, resStr, resultTailIndex});
                                chunkRes.remove(chunkHeadIndex);
                            }
                        }
                        if (completedChunks[idx + 1] == -1) {
                            LOGGER.debug("next chunk was completed, merging the tail");
                            int chunkTailIdx = (Integer)Collections.max(chunkRes.keySet());
                            tailStr = String.valueOf((char[])chunkRes.get(chunkTailIdx));
                            tmpIdx = chunkTailIdx;
                            while (null == res.getByIndex(tmpIdx)) {
                                ++tmpIdx;
                            }
                            int resultHeadIndex = tmpIdx;
                            SAXRecord resultHead = res.getByIndex(resultHeadIndex);
                            String headStr3 = String.valueOf(resultHead.getPayload());
                            LOGGER.debug("last index in the res {} for {}, first index in the chunk {} for {}", new Object[]{resultHeadIndex, headStr3, chunkTailIdx, headStr3});
                            if (nrStrategy.equals((Object)NumerosityReductionStrategy.EXACT) && headStr3.equalsIgnoreCase(tailStr)) {
                                LOGGER.debug("chunk head {} at {} is dropped in favor of res tail {} at {}", new Object[]{headStr3, resultHeadIndex, tailStr, chunkTailIdx});
                                res.dropByIndex(resultHeadIndex);
                            }
                        }
                        res.addAll(chunkRes);
                    }
                }
                --totalTaskCounter;
            }
        }
        catch (InterruptedException e) {
            LOGGER.error("Error while waiting results.", (Throwable)e);
            this.cancel();
        }
        catch (Exception e) {
            LOGGER.error("Error while waiting results.", (Throwable)e);
        }
        finally {
            try {
                if (!this.executorService.awaitTermination(1L, TimeUnit.HOURS)) {
                    this.executorService.shutdownNow();
                    if (!this.executorService.awaitTermination(30L, TimeUnit.MINUTES)) {
                        System.err.println("Pool did not terminate... FATAL ERROR");
                        throw new RuntimeException("Parallel SAX pool did not terminate... FATAL ERROR");
                    }
                }
            }
            catch (InterruptedException ie) {
                LOGGER.error("Error while waiting interrupting.", (Throwable)ie);
                this.executorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        if (NumerosityReductionStrategy.MINDIST.equals((Object)numRedStrategy)) {
            SAXRecords newRes = new SAXRecords();
            ArrayList<Integer> keys = res.getAllIndices();
            char[] oldStr = null;
            for (int i : keys) {
                SAXRecord entry = res.getByIndex(i);
                if (null != oldStr && sp.checkMinDistIsZero(entry.getPayload(), oldStr)) continue;
                newRes.add(entry.getPayload(), i);
                oldStr = entry.getPayload();
            }
            res = newRes;
        }
        return res;
    }

    public void cancel() {
        try {
            this.executorService.shutdown();
            if (!this.executorService.awaitTermination(30L, TimeUnit.MINUTES)) {
                this.executorService.shutdownNow();
                if (!this.executorService.awaitTermination(30L, TimeUnit.MINUTES)) {
                    LOGGER.error("Pool did not terminate... FATAL ERROR");
                    throw new RuntimeException("Parallel SAX pool did not terminate... FATAL ERROR");
                }
            } else {
                LOGGER.error("Parallel SAX was interrupted by a request");
            }
        }
        catch (InterruptedException ie) {
            LOGGER.error("Error while waiting interrupting.", (Throwable)ie);
            this.executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

