/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.batch;

import ai.grakn.Keyspace;
import ai.grakn.batch.GraknClient;
import ai.grakn.batch.GraknClientException;
import ai.grakn.batch.QueryResponse;
import ai.grakn.graql.Query;
import ai.grakn.util.SimpleURI;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserProperties;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import java.io.Closeable;
import java.net.ConnectException;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;

public class BatchExecutorClient
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(BatchExecutorClient.class);
    private final GraknClient graknClient;
    private final HystrixRequestContext context;
    private final Semaphore queryExecutionSemaphore;
    private final int maxDelay;
    private final int maxRetries;
    private final int maxQueries;
    private final int threadPoolCoreSize;
    private final int timeoutMs;
    private final MetricRegistry metricRegistry;
    private final Meter failureMeter;
    private final Timer addTimer;
    private final Scheduler scheduler;
    private final ExecutorService executor;
    private boolean requestLogEnabled;
    @Nullable
    private Consumer<? super QueryResponse> queryResponseHandler = null;
    @Nullable
    private Consumer<? super Exception> exceptionHandler = null;
    private final UUID id = UUID.randomUUID();

    private BatchExecutorClient(Builder builder) {
        this.context = HystrixRequestContext.initializeContext();
        this.graknClient = builder.graknClient;
        this.maxDelay = builder.maxDelay;
        this.maxRetries = builder.maxRetries;
        this.maxQueries = builder.maxQueries;
        this.metricRegistry = builder.metricRegistry;
        this.timeoutMs = builder.timeoutMs;
        this.threadPoolCoreSize = builder.threadPoolCoreSize;
        this.requestLogEnabled = builder.requestLogEnabled;
        this.executor = Executors.newFixedThreadPool(this.threadPoolCoreSize);
        this.scheduler = Schedulers.from((Executor)this.executor);
        this.queryExecutionSemaphore = new Semaphore(this.maxQueries);
        this.addTimer = this.metricRegistry.timer(MetricRegistry.name(BatchExecutorClient.class, (String[])new String[]{"add"}));
        this.failureMeter = this.metricRegistry.meter(MetricRegistry.name(BatchExecutorClient.class, (String[])new String[]{"failure"}));
    }

    public void add(Query<?> query, Keyspace keyspace) {
        QueryRequest queryRequest = new QueryRequest(query);
        queryRequest.acquirePermit();
        Timer.Context contextAddTimer = this.addTimer.time();
        Observable observable = new QueriesObservableCollapser(queryRequest, keyspace).observe().doOnError(error -> this.failureMeter.mark()).subscribeOn(this.scheduler).doOnTerminate(() -> ((Timer.Context)contextAddTimer).close());
        observable.subscribe();
    }

    public void onNext(Consumer<? super QueryResponse> queryResponseHandler) {
        this.queryResponseHandler = queryResponseHandler;
    }

    public void onError(Consumer<? super Exception> exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    @Override
    public void close() {
        LOG.debug("Closing BatchExecutorClient");
        LOG.trace("Acquiring all {} permits ({} available)", (Object)this.maxQueries, (Object)this.queryExecutionSemaphore.availablePermits());
        this.queryExecutionSemaphore.acquireUninterruptibly(this.maxQueries);
        LOG.trace("Acquired all {} permits ({} available)", (Object)this.maxQueries, (Object)this.queryExecutionSemaphore.availablePermits());
        this.context.close();
        this.executor.shutdownNow();
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public static Builder newBuilderforURI(SimpleURI simpleURI) {
        return new Builder().taskClient(GraknClient.of(simpleURI));
    }

    private HystrixCollapserKey hystrixCollapserKey(Keyspace keyspace) {
        return HystrixCollapserKey.Factory.asKey((String)String.format("QueriesObservableCollapser_%s_%s", this.id, keyspace));
    }

    private class QueriesObservableCollapser
    extends HystrixCollapser<List<QueryResponse>, QueryResponse, QueryRequest> {
        private final QueryRequest query;
        private Keyspace keyspace;

        QueriesObservableCollapser(QueryRequest query, Keyspace keyspace) {
            super(HystrixCollapser.Setter.withCollapserKey((HystrixCollapserKey)BatchExecutorClient.this.hystrixCollapserKey(keyspace)).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withRequestCacheEnabled(false).withTimerDelayInMilliseconds(BatchExecutorClient.this.maxDelay)));
            this.query = query;
            this.keyspace = keyspace;
        }

        public QueryRequest getRequestArgument() {
            return this.query;
        }

        protected HystrixCommand<List<QueryResponse>> createCommand(Collection<HystrixCollapser.CollapsedRequest<QueryResponse, QueryRequest>> collapsedRequests) {
            List<QueryRequest> requests = collapsedRequests.stream().map(HystrixCollapser.CollapsedRequest::getArgument).collect(Collectors.toList());
            return new CommandQueries(requests, this.keyspace);
        }

        protected void mapResponseToRequests(List<QueryResponse> batchResponse, Collection<HystrixCollapser.CollapsedRequest<QueryResponse, QueryRequest>> collapsedRequests) {
            int count = 0;
            for (HystrixCollapser.CollapsedRequest<QueryResponse, QueryRequest> request : collapsedRequests) {
                QueryResponse response = batchResponse.get(count++);
                request.setResponse((Object)response);
                request.setComplete();
            }
            BatchExecutorClient.this.metricRegistry.histogram(MetricRegistry.name(QueriesObservableCollapser.class, (String[])new String[]{"batch", "size"})).update(collapsedRequests.size());
        }
    }

    private class CommandQueries
    extends HystrixCommand<List<QueryResponse>> {
        static final int QUEUE_MULTIPLIER = 1024;
        private final List<QueryRequest> queries;
        private final Keyspace keyspace;
        private final Timer graqlExecuteTimer;
        private final Meter attemptMeter;
        private final Retryer<List> retryer;

        CommandQueries(List<QueryRequest> queries, Keyspace keyspace) {
            super(HystrixCommand.Setter.withGroupKey((HystrixCommandGroupKey)HystrixCommandGroupKey.Factory.asKey((String)"BatchExecutor")).andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(BatchExecutorClient.this.threadPoolCoreSize).withQueueSizeRejectionThreshold(BatchExecutorClient.this.threadPoolCoreSize * 1024).withMaxQueueSize(BatchExecutorClient.this.threadPoolCoreSize * 1024)).andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutEnabled(false).withExecutionTimeoutInMilliseconds(BatchExecutorClient.this.timeoutMs).withRequestLogEnabled(BatchExecutorClient.this.requestLogEnabled)));
            this.queries = queries;
            this.keyspace = keyspace;
            this.graqlExecuteTimer = BatchExecutorClient.this.metricRegistry.timer(MetricRegistry.name(((Object)((Object)this)).getClass(), (String[])new String[]{"execute"}));
            this.attemptMeter = BatchExecutorClient.this.metricRegistry.meter(MetricRegistry.name(((Object)((Object)this)).getClass(), (String[])new String[]{"attempt"}));
            this.retryer = RetryerBuilder.newBuilder().retryIfException(throwable -> throwable instanceof GraknClientException && ((GraknClientException)throwable).isRetriable()).retryIfExceptionOfType(ConnectException.class).withWaitStrategy(WaitStrategies.exponentialWait((long)10L, (long)1L, (TimeUnit)TimeUnit.MINUTES)).withStopStrategy(StopStrategies.stopAfterAttempt((int)(BatchExecutorClient.this.maxRetries + 1))).withRetryListener(new RetryListener(){

                public <V> void onRetry(Attempt<V> attempt) {
                    CommandQueries.this.attemptMeter.mark();
                }
            }).build();
        }

        protected List run() throws GraknClientException {
            List queryList = this.queries.stream().map(QueryRequest::getQuery).collect(Collectors.toList());
            try {
                List responses = (List)this.retryer.call(() -> {
                    try (Timer.Context c = this.graqlExecuteTimer.time();){
                        List<QueryResponse> list = BatchExecutorClient.this.graknClient.graqlExecute(queryList, this.keyspace);
                        return list;
                    }
                });
                if (BatchExecutorClient.this.queryResponseHandler != null) {
                    responses.forEach(BatchExecutorClient.this.queryResponseHandler);
                }
                List list = responses;
                return list;
            }
            catch (RetryException | ExecutionException e) {
                Throwable cause = e.getCause();
                if (cause instanceof GraknClientException) {
                    if (BatchExecutorClient.this.exceptionHandler != null) {
                        BatchExecutorClient.this.exceptionHandler.accept((GraknClientException)cause);
                    }
                    throw (GraknClientException)cause;
                }
                RuntimeException exception = new RuntimeException("Unexpected exception while retrying, " + queryList.size() + " queries failed.", e);
                if (BatchExecutorClient.this.exceptionHandler != null) {
                    BatchExecutorClient.this.exceptionHandler.accept(exception);
                }
                throw exception;
            }
            finally {
                this.queries.forEach(QueryRequest::releasePermit);
            }
        }
    }

    private class QueryRequest {
        private Query<?> query;
        private UUID id;

        QueryRequest(Query<?> query) {
            this.query = query;
            this.id = UUID.randomUUID();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            QueryRequest that = (QueryRequest)o;
            return (this.query != null ? this.query.equals(that.query) : that.query == null) && (this.id != null ? this.id.equals(that.id) : that.id == null);
        }

        public int hashCode() {
            int result = this.query != null ? this.query.hashCode() : 0;
            result = 31 * result + (this.id != null ? this.id.hashCode() : 0);
            return result;
        }

        public String toString() {
            return "QueryRequest{query=" + this.query + ", id=" + this.id + '}';
        }

        public Query<?> getQuery() {
            return this.query;
        }

        void acquirePermit() {
            assert (BatchExecutorClient.this.queryExecutionSemaphore.availablePermits() <= BatchExecutorClient.this.maxQueries) : "Number of available permits should never exceed max queries";
            LOG.trace("Acquiring a permit for {} ({} available)", (Object)this.id, (Object)BatchExecutorClient.this.queryExecutionSemaphore.availablePermits());
            BatchExecutorClient.this.queryExecutionSemaphore.acquireUninterruptibly();
            LOG.trace("Acquired a permit for {} ({} available)", (Object)this.id, (Object)BatchExecutorClient.this.queryExecutionSemaphore.availablePermits());
        }

        void releasePermit() {
            BatchExecutorClient.this.queryExecutionSemaphore.release();
            int availablePermits = BatchExecutorClient.this.queryExecutionSemaphore.availablePermits();
            LOG.trace("Released a permit for {} ({} available)", (Object)this.id, (Object)availablePermits);
        }
    }

    public static final class Builder {
        private GraknClient graknClient;
        private int maxDelay = 50;
        private int maxRetries = 5;
        private int threadPoolCoreSize = 8;
        private int timeoutMs = 60000;
        private int maxQueries = 10000;
        private boolean requestLogEnabled = false;
        private MetricRegistry metricRegistry = new MetricRegistry();

        private Builder() {
        }

        public Builder taskClient(GraknClient val) {
            this.graknClient = val;
            return this;
        }

        public Builder maxDelay(int val) {
            this.maxDelay = val;
            return this;
        }

        public Builder maxRetries(int val) {
            this.maxRetries = val;
            return this;
        }

        public Builder threadPoolCoreSize(int val) {
            this.threadPoolCoreSize = val;
            return this;
        }

        public Builder metricRegistry(MetricRegistry val) {
            this.metricRegistry = val;
            return this;
        }

        public Builder maxQueries(int val) {
            this.maxQueries = val;
            return this;
        }

        public Builder requestLogEnabled(boolean val) {
            this.requestLogEnabled = val;
            return this;
        }

        public BatchExecutorClient build() {
            return new BatchExecutorClient(this);
        }
    }
}

