/*
 * Decompiled with CFR 0.152.
 */
package com.hubspot.singularity.executor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.SingularityTaskShellCommandRequest;
import com.hubspot.singularity.executor.SingularityExecutorMonitor;
import com.hubspot.singularity.executor.config.SingularityExecutorConfiguration;
import com.hubspot.singularity.executor.models.ThreadCheckerType;
import com.hubspot.singularity.executor.shells.SingularityExecutorShellCommandRunner;
import com.hubspot.singularity.executor.shells.SingularityExecutorShellCommandUpdater;
import com.hubspot.singularity.executor.task.SingularityExecutorTaskProcessCallable;
import com.hubspot.singularity.executor.utils.DockerUtils;
import com.hubspot.singularity.runner.base.shared.ProcessFailedException;
import com.hubspot.singularity.runner.base.shared.SimpleProcessManager;
import com.spotify.docker.client.exceptions.DockerException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.NOPLogger;

@Singleton
public class SingularityExecutorThreadChecker {
    private static final Logger LOG = LoggerFactory.getLogger(SingularityExecutorThreadChecker.class);
    private static Pattern CGROUP_CPU_REGEX = Pattern.compile("^\\d+:cpu:/(.*)$");
    private static Pattern PROC_STATUS_THREADS_REGEX = Pattern.compile("Threads:\\s*(\\d+)\\s*$");
    private final SingularityExecutorConfiguration configuration;
    private final ScheduledExecutorService scheduledExecutorService;
    private final DockerUtils dockerUtils;
    private final ObjectMapper objectMapper;
    private SingularityExecutorMonitor monitor;

    @Inject
    public SingularityExecutorThreadChecker(SingularityExecutorConfiguration configuration, DockerUtils dockerUtils, ObjectMapper objectMapper) {
        this.configuration = configuration;
        this.dockerUtils = dockerUtils;
        this.objectMapper = objectMapper;
        this.scheduledExecutorService = Executors.newScheduledThreadPool(configuration.getThreadCheckThreads(), new ThreadFactoryBuilder().setNameFormat("SingularityExecutorThreadCheckerThread-%d").build());
    }

    public void start(SingularityExecutorMonitor monitor) {
        LOG.info("Starting a thread checker that will run every {}", (Object)JavaUtils.durationFromMillis((long)this.configuration.getCheckThreadsEveryMillis()));
        this.monitor = monitor;
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                long start = System.currentTimeMillis();
                try {
                    SingularityExecutorThreadChecker.this.checkThreads();
                }
                catch (Throwable t) {
                    LOG.error("While checking threads", t);
                }
                finally {
                    LOG.trace("Finished checking threads after {}", (Object)JavaUtils.duration((long)start));
                }
            }
        }, this.configuration.getCheckThreadsEveryMillis(), this.configuration.getCheckThreadsEveryMillis(), TimeUnit.MILLISECONDS);
    }

    private void checkThreads() {
        for (final SingularityExecutorTaskProcessCallable taskProcess : this.monitor.getRunningTasks()) {
            if (!taskProcess.getTask().getExecutorData().getMaxTaskThreads().isPresent()) continue;
            int maxThreads = (Integer)taskProcess.getTask().getExecutorData().getMaxTaskThreads().get();
            final AtomicInteger usedThreads = new AtomicInteger(0);
            try {
                usedThreads.set(this.getNumUsedThreads(taskProcess));
                LOG.trace("{} is using {} threads", (Object)taskProcess.getTask().getTaskId(), (Object)usedThreads);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                return;
            }
            catch (Throwable t) {
                if (taskProcess.wasKilled()) continue;
                taskProcess.getTask().getLog().error("While fetching used threads for {}", (Object)taskProcess.getTask().getTaskId(), (Object)t);
                continue;
            }
            if (usedThreads.get() <= maxThreads) continue;
            taskProcess.getTask().getLog().info("{} using too many threads: {} (max {})", new Object[]{taskProcess.getTask().getTaskId(), usedThreads, maxThreads});
            if (this.configuration.getRunShellCommandBeforeKillDueToThreads().isPresent()) {
                SingularityTaskShellCommandRequest shellRequest = new SingularityTaskShellCommandRequest(SingularityTaskId.valueOf((String)taskProcess.getTask().getTaskId()), Optional.empty(), System.currentTimeMillis(), this.configuration.getRunShellCommandBeforeKillDueToThreads().get());
                SingularityExecutorShellCommandUpdater updater = new SingularityExecutorShellCommandUpdater(this.objectMapper, shellRequest, taskProcess.getTask());
                SingularityExecutorShellCommandRunner shellRunner = new SingularityExecutorShellCommandRunner(shellRequest, this.configuration, taskProcess.getTask(), taskProcess, this.monitor.getShellCommandExecutorServiceForTask(taskProcess.getTask().getTaskId()), updater);
                Futures.addCallback(shellRunner.start(), (FutureCallback)new FutureCallback<Integer>(){

                    public void onSuccess(Integer result) {
                        taskProcess.getTask().markKilledDueToThreads(usedThreads.get());
                        SingularityExecutorMonitor.KillState killState = SingularityExecutorThreadChecker.this.monitor.requestKill(taskProcess.getTask().getTaskId());
                        taskProcess.getTask().getLog().info("Killing {} due to thread overage (kill state {})", (Object)taskProcess.getTask().getTaskId(), (Object)killState);
                    }

                    public void onFailure(Throwable t) {
                        taskProcess.getTask().getLog().warn("Unable to run pre-threadkill shell command {} for {}!", new Object[]{SingularityExecutorThreadChecker.this.configuration.getRunShellCommandBeforeKillDueToThreads().get().getName(), taskProcess.getTask().getTaskId(), t});
                        taskProcess.getTask().markKilledDueToThreads(usedThreads.get());
                        SingularityExecutorMonitor.KillState killState = SingularityExecutorThreadChecker.this.monitor.requestKill(taskProcess.getTask().getTaskId());
                        taskProcess.getTask().getLog().info("Killing {} due to thread overage (kill state {})", (Object)taskProcess.getTask().getTaskId(), (Object)killState);
                    }
                }, (Executor)this.monitor.getShellCommandExecutorServiceForTask(taskProcess.getTask().getTaskId()));
                continue;
            }
            taskProcess.getTask().markKilledDueToThreads(usedThreads.get());
            SingularityExecutorMonitor.KillState killState = this.monitor.requestKill(taskProcess.getTask().getTaskId());
            taskProcess.getTask().getLog().info("Killing {} due to thread overage (kill state {})", (Object)taskProcess.getTask().getTaskId(), (Object)killState);
        }
    }

    public ExecutorService getExecutorService() {
        return this.scheduledExecutorService;
    }

    private int getNumUsedThreads(SingularityExecutorTaskProcessCallable taskProcess) throws InterruptedException, ProcessFailedException {
        Optional<Integer> dockerPid = Optional.empty();
        if (taskProcess.getTask().getTaskInfo().hasContainer() && taskProcess.getTask().getTaskInfo().getContainer().hasDocker()) {
            try {
                String containerName = String.format("%s%s", this.configuration.getDockerPrefix(), taskProcess.getTask().getTaskId());
                int possiblePid = this.dockerUtils.getPid(containerName);
                if (possiblePid == 0) {
                    LOG.warn(String.format("Container %s has pid %s (running: %s). Defaulting to 0 threads running.", containerName, possiblePid, this.dockerUtils.isContainerRunning(containerName)));
                    return 0;
                }
                dockerPid = Optional.of(possiblePid);
            }
            catch (DockerException e) {
                throw new ProcessFailedException("Could not get docker root pid due to error", (Throwable)e);
            }
        }
        try {
            Optional<Integer> numThreads = this.getNumThreads(this.configuration.getThreadCheckerType(), taskProcess, dockerPid);
            if (numThreads.isPresent()) {
                return numThreads.get();
            }
            LOG.warn("Could not get num threads using {} thread checker", (Object)this.configuration.getThreadCheckerType());
            return 0;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Optional<Integer> getNumThreads(ThreadCheckerType type, SingularityExecutorTaskProcessCallable taskProcess, Optional<Integer> dockerPid) throws InterruptedException, ProcessFailedException, IOException {
        Optional<Integer> numThreads;
        switch (type) {
            case CGROUP: {
                numThreads = this.getNumThreadsFromCgroup(taskProcess, dockerPid);
                break;
            }
            case PS: {
                numThreads = this.getNumThreadsFromCommand(taskProcess, dockerPid, "ps hH p %s | wc -l");
                break;
            }
            default: {
                numThreads = this.getNumThreadsFromProcStatus(taskProcess, dockerPid);
            }
        }
        return numThreads;
    }

    private Optional<Integer> getNumThreadsFromCommand(SingularityExecutorTaskProcessCallable taskProcess, Optional<Integer> dockerPid, String commandFormat) throws InterruptedException, ProcessFailedException {
        SimpleProcessManager checkThreadsProcessManager = new SimpleProcessManager((Logger)NOPLogger.NOP_LOGGER);
        ImmutableList cmd = ImmutableList.of((Object)"/bin/sh", (Object)"-c", (Object)String.format(commandFormat, dockerPid.orElse((Integer)taskProcess.getCurrentPid().get())));
        List output = checkThreadsProcessManager.runCommandWithOutput((List)cmd);
        if (output.isEmpty()) {
            LOG.warn("Output from ls was empty ({})", (Object)cmd);
            return Optional.empty();
        }
        return Optional.of(Integer.parseInt((String)output.get(0)));
    }

    private Optional<Integer> getNumThreadsFromProcStatus(SingularityExecutorTaskProcessCallable taskProcess, Optional<Integer> dockerPid) throws InterruptedException, IOException {
        Path procStatusPath = Paths.get(String.format("/proc/%s/status", dockerPid.orElse((Integer)taskProcess.getCurrentPid().get())), new String[0]);
        if (Files.exists(procStatusPath, new LinkOption[0])) {
            for (String line : Files.readAllLines(procStatusPath, Charsets.UTF_8)) {
                Matcher matcher = PROC_STATUS_THREADS_REGEX.matcher(line);
                if (!matcher.matches()) continue;
                return Optional.of(Integer.parseInt(matcher.group(1)));
            }
            LOG.warn("Unable to parse threads from proc status file {}", (Object)procStatusPath);
            return Optional.empty();
        }
        LOG.warn("Proc status file does not exist for pid {}", (Object)dockerPid.orElse((Integer)taskProcess.getCurrentPid().get()));
        return Optional.empty();
    }

    private Optional<Integer> getNumThreadsFromCgroup(SingularityExecutorTaskProcessCallable taskProcess, Optional<Integer> dockerPid) throws InterruptedException, IOException {
        Path procCgroupPath = Paths.get(String.format(this.configuration.getProcCgroupFormat(), dockerPid.orElse((Integer)taskProcess.getCurrentPid().get())), new String[0]);
        if (Files.exists(procCgroupPath, new LinkOption[0])) {
            for (String line : Files.readAllLines(procCgroupPath, Charsets.UTF_8)) {
                Matcher matcher = CGROUP_CPU_REGEX.matcher(line);
                if (!matcher.matches()) continue;
                return Optional.of(Files.readAllLines(Paths.get(String.format(this.configuration.getCgroupsMesosCpuTasksFormat(), matcher.group(1)), new String[0]), Charsets.UTF_8).size());
            }
            LOG.warn("Unable to parse cgroup container from {}", (Object)procCgroupPath.toString());
            return Optional.empty();
        }
        LOG.warn("cgroup {} does not exist", (Object)procCgroupPath.toString());
        return Optional.empty();
    }
}

