/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow.http;

import io.activej.codec.StructuredCodec;
import io.activej.codec.StructuredCodecs;
import io.activej.codec.StructuredEncoder;
import io.activej.codec.json.JsonUtils;
import io.activej.common.exception.UncheckedException;
import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.net.Messaging;
import io.activej.csp.net.MessagingWithBinaryStreaming;
import io.activej.dataflow.command.DataflowCommand;
import io.activej.dataflow.command.DataflowCommandGetTasks;
import io.activej.dataflow.command.DataflowResponse;
import io.activej.dataflow.command.DataflowResponsePartitionData;
import io.activej.dataflow.command.DataflowResponseResult;
import io.activej.dataflow.command.DataflowResponseTaskData;
import io.activej.dataflow.graph.Partition;
import io.activej.dataflow.graph.TaskStatus;
import io.activej.dataflow.http.LocalTaskData;
import io.activej.dataflow.http.ReducedTaskData;
import io.activej.dataflow.inject.CodecsModule;
import io.activej.dataflow.stats.NodeStat;
import io.activej.dataflow.stats.StatReducer;
import io.activej.http.AsyncServlet;
import io.activej.http.HttpException;
import io.activej.http.HttpMethod;
import io.activej.http.HttpRequest;
import io.activej.http.HttpResponse;
import io.activej.http.RoutingServlet;
import io.activej.http.StaticServlet;
import io.activej.inject.Key;
import io.activej.inject.ResourceLocator;
import io.activej.inject.util.Types;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.net.socket.tcp.AsyncTcpSocketNio;
import io.activej.promise.Promisable;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public final class DataflowDebugServlet
implements AsyncServlet {
    private final AsyncServlet servlet;
    private final ByteBufsCodec<DataflowResponse, DataflowCommand> codec;

    /*
     * Issues handling annotations - annotations may be inaccurate
     */
    public DataflowDebugServlet(List<Partition> partitions, Executor executor, ByteBufsCodec<DataflowResponse, DataflowCommand> codec, ResourceLocator env) {
        this.codec = codec;
        StructuredCodec reducedTaskDataCodec = (StructuredCodec)env.getInstance(CodecsModule.codec(ReducedTaskData.class));
        StructuredCodec localTaskDataCodec = (StructuredCodec)env.getInstance(CodecsModule.codec(LocalTaskData.class));
        StructuredCodec taskStatusCodec = (StructuredCodec)env.getInstance(CodecsModule.codec(TaskStatus.class));
        @Nullable StructuredCodec taskListCodec = StructuredCodecs.ofMap((StructuredCodec)StructuredCodecs.LONG_CODEC, (StructuredCodec)StructuredCodecs.ofList((StructuredCodec)taskStatusCodec.nullable()));
        this.servlet = RoutingServlet.create().map("/*", (AsyncServlet)StaticServlet.ofClassPath((Executor)executor, (String)"debug").withIndexHtml()).map("/api/*", (AsyncServlet)RoutingServlet.create().map(HttpMethod.GET, "/partitions", request -> HttpResponse.ok200().withJson(partitions.stream().map(p -> "\"" + p.getAddress().getAddress().getHostAddress() + ":" + p.getAddress().getPort() + "\"").collect(Collectors.joining(",", "[", "]")))).map(HttpMethod.GET, "/tasks", request -> Promises.toList(partitions.stream().map(p -> this.getPartitionData(p.getAddress()))).map(partitionStats -> {
            HashMap<Long, @Nullable List> tasks = new HashMap<Long, List>();
            for (int i = 0; i < partitionStats.size(); ++i) {
                DataflowResponsePartitionData partitionStat = (DataflowResponsePartitionData)partitionStats.get(i);
                for (DataflowResponsePartitionData.TaskDesc taskDesc : partitionStat.getLast()) {
                    tasks.computeIfAbsent(taskDesc.getId(), $ -> Arrays.asList(new TaskStatus[partitionStats.size()])).set(i, taskDesc.getStatus());
                }
            }
            return HttpResponse.ok200().withJson(JsonUtils.toJson((StructuredEncoder)taskListCodec, tasks));
        })).map(HttpMethod.GET, "/tasks/:taskID", request -> DataflowDebugServlet.getTaskId(request).then(id -> Promises.toList(partitions.stream().map(p -> this.getTask(p.getAddress(), (long)id)).collect(Collectors.toList())).map(localStats -> {
            List<@Nullable TaskStatus> statuses = Arrays.asList(new TaskStatus[localStats.size()]);
            HashMap<K, @Nullable V> nodeStats = new HashMap();
            for (int i = 0; i < localStats.size(); ++i) {
                DataflowResponseTaskData localTaskData = (DataflowResponseTaskData)localStats.get(i);
                if (localTaskData == null) continue;
                statuses.set(i, localTaskData.getStatus());
                int finalI = i;
                localTaskData.getNodes().forEach((index, nodeStat) -> nodeStats.computeIfAbsent(index, $ -> Arrays.asList(new NodeStat[localStats.size()])).set(finalI, nodeStat));
            }
            @Nullable Map reduced = nodeStats.entrySet().stream().collect(HashMap::new, (m, e) -> {
                @Nullable NodeStat r = DataflowDebugServlet.reduce((List)e.getValue(), env);
                if (r != null) {
                    m.put((Integer)e.getKey(), r);
                }
            }, HashMap::putAll);
            ReducedTaskData taskData = new ReducedTaskData(statuses, ((DataflowResponseTaskData)localStats.get(0)).getGraphViz(), reduced);
            return HttpResponse.ok200().withJson(JsonUtils.toJson((StructuredEncoder)reducedTaskDataCodec, (Object)taskData));
        }))).map(HttpMethod.GET, "/tasks/:taskID/:index", request -> DataflowDebugServlet.getTaskId(request).then(id -> {
            Partition partition;
            String indexParam = request.getPathParameter("index");
            try {
                partition = (Partition)partitions.get(Integer.parseInt(indexParam));
            }
            catch (IndexOutOfBoundsException | NumberFormatException e) {
                return Promise.ofException((Throwable)HttpException.ofCode((int)400, (String)"Bad index"));
            }
            return this.getTask(partition.getAddress(), (long)id).map(task -> HttpResponse.ok200().withJson(JsonUtils.toJson((StructuredEncoder)localTaskDataCodec, (Object)new LocalTaskData(task.getStatus(), task.getGraphViz(), task.getNodes(), task.getStartTime(), task.getFinishTime(), task.getErrorString()))));
        })));
    }

    @Nullable
    private static NodeStat reduce(List<NodeStat> stats, ResourceLocator env) {
        Optional<NodeStat> firstNonNull = stats.stream().filter(Objects::nonNull).findAny();
        if (!firstNonNull.isPresent()) {
            return null;
        }
        StatReducer reducer = (StatReducer)env.getInstanceOrNull(Key.ofType((Type)Types.parameterized(StatReducer.class, (Type[])new Type[]{firstNonNull.get().getClass()})));
        if (reducer == null) {
            return null;
        }
        return reducer.reduce(stats);
    }

    private static Promise<Long> getTaskId(HttpRequest request) {
        String param = request.getPathParameter("taskID");
        try {
            return Promise.of((Object)Long.parseLong(param));
        }
        catch (NumberFormatException e) {
            return Promise.ofException((Throwable)HttpException.ofCode((int)400, (String)("Bad number " + param)));
        }
    }

    private Promise<DataflowResponsePartitionData> getPartitionData(InetSocketAddress address) {
        return AsyncTcpSocketNio.connect((InetSocketAddress)address).then(socket -> {
            MessagingWithBinaryStreaming messaging = MessagingWithBinaryStreaming.create((AsyncTcpSocket)socket, this.codec);
            return messaging.send((Object)new DataflowCommandGetTasks(null)).then(arg_0 -> DataflowDebugServlet.lambda$getPartitionData$16((Messaging)messaging, arg_0)).then(arg_0 -> DataflowDebugServlet.lambda$getPartitionData$17((Messaging)messaging, address, arg_0));
        });
    }

    private Promise<DataflowResponseTaskData> getTask(InetSocketAddress address, long taskId) {
        return AsyncTcpSocketNio.connect((InetSocketAddress)address).then(socket -> {
            MessagingWithBinaryStreaming messaging = MessagingWithBinaryStreaming.create((AsyncTcpSocket)socket, this.codec);
            return messaging.send((Object)new DataflowCommandGetTasks(taskId)).then(arg_0 -> DataflowDebugServlet.lambda$getTask$19((Messaging)messaging, arg_0)).then(arg_0 -> DataflowDebugServlet.lambda$getTask$20((Messaging)messaging, address, arg_0));
        });
    }

    @NotNull
    public Promisable<HttpResponse> serve(@NotNull HttpRequest request) throws UncheckedException {
        return this.servlet.serve(request);
    }

    private static /* synthetic */ Promise lambda$getTask$20(Messaging messaging, InetSocketAddress address, DataflowResponse response) {
        messaging.close();
        if (response instanceof DataflowResponseTaskData) {
            return Promise.of((Object)((DataflowResponseTaskData)response));
        }
        if (response instanceof DataflowResponseResult) {
            return Promise.ofException((Throwable)new Exception("Error on remote server " + address + ": " + ((DataflowResponseResult)response).getError()));
        }
        return Promise.ofException((Throwable)new Exception("Bad response from server"));
    }

    private static /* synthetic */ Promise lambda$getTask$19(Messaging messaging, Void $) {
        return messaging.receive();
    }

    private static /* synthetic */ Promise lambda$getPartitionData$17(Messaging messaging, InetSocketAddress address, DataflowResponse response) {
        messaging.close();
        if (response instanceof DataflowResponsePartitionData) {
            return Promise.of((Object)((DataflowResponsePartitionData)response));
        }
        if (response instanceof DataflowResponseResult) {
            return Promise.ofException((Throwable)new Exception("Error on remote server " + address + ": " + ((DataflowResponseResult)response).getError()));
        }
        return Promise.ofException((Throwable)new Exception("Bad response from server"));
    }

    private static /* synthetic */ Promise lambda$getPartitionData$16(Messaging messaging, Void $) {
        return messaging.receive();
    }
}

