/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.rest.resources;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/connectors")
@Produces(value={"application/json"})
@Consumes(value={"application/json"})
public class ConnectorsResource {
    private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
    private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE = new TypeReference<List<Map<String, String>>>(){};
    public static final long REQUEST_TIMEOUT_MS = 90000L;
    private static long requestTimeoutMs = 90000L;
    private final Herder herder;
    private final WorkerConfig config;
    @Context
    private ServletContext context;
    private final boolean isTopicTrackingDisabled;
    private final boolean isTopicTrackingResetDisabled;

    public ConnectorsResource(Herder herder, WorkerConfig config) {
        this.herder = herder;
        this.config = config;
        this.isTopicTrackingDisabled = config.getBoolean("topic.tracking.enable") == false;
        this.isTopicTrackingResetDisabled = config.getBoolean("topic.tracking.allow.reset") == false;
    }

    public static void setRequestTimeout(long requestTimeoutMs) {
        ConnectorsResource.requestTimeoutMs = requestTimeoutMs;
    }

    public static void resetRequestTimeout() {
        requestTimeoutMs = 90000L;
    }

    @GET
    @Path(value="/")
    public Response listConnectors(@Context UriInfo uriInfo, @Context HttpHeaders headers) {
        if (uriInfo.getQueryParameters().containsKey("expand")) {
            HashMap out = new HashMap();
            for (String connector : this.herder.connectors()) {
                try {
                    HashMap<String, Object> connectorExpansions = new HashMap<String, Object>();
                    Iterator iterator = ((List)uriInfo.getQueryParameters().get("expand")).iterator();
                    block11: while (iterator.hasNext()) {
                        String expansion;
                        switch (expansion = (String)iterator.next()) {
                            case "status": {
                                connectorExpansions.put("status", this.herder.connectorStatus(connector));
                                continue block11;
                            }
                            case "info": {
                                connectorExpansions.put("info", this.herder.connectorInfo(connector));
                                continue block11;
                            }
                        }
                        log.info("Ignoring unknown expansion type {}", (Object)expansion);
                    }
                    out.put(connector, connectorExpansions);
                }
                catch (NotFoundException e) {
                    log.debug("Unable to get connector info for {} on this worker", (Object)connector);
                }
            }
            return Response.ok(out).build();
        }
        return Response.ok(this.herder.connectors()).build();
    }

    @POST
    @Path(value="/")
    public Response createConnector(@QueryParam(value="forward") Boolean forward, @Context HttpHeaders headers, CreateConnectorRequest createRequest) throws Throwable {
        String name = createRequest.name() == null ? "" : createRequest.name().trim();
        Map<String, String> configs = createRequest.config();
        this.checkAndPutConnectorConfigName(name, configs);
        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<Herder.Created<ConnectorInfo>>();
        this.herder.putConnectorConfig(name, configs, false, cb);
        Herder.Created<ConnectorInfo> info = this.completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest, new TypeReference<ConnectorInfo>(){}, new CreatedConnectorInfoTranslator(), forward);
        URI location = UriBuilder.fromUri("/connectors").path(name).build(new Object[0]);
        return Response.created(location).entity(info.result()).build();
    }

    @GET
    @Path(value="/{connector}")
    public ConnectorInfo getConnector(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<ConnectorInfo> cb = new FutureCallback<ConnectorInfo>();
        this.herder.connectorInfo(connector, cb);
        return this.completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward);
    }

    @GET
    @Path(value="/{connector}/config")
    public Map<String, String> getConnectorConfig(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<Map<String, String>> cb = new FutureCallback<Map<String, String>>();
        this.herder.connectorConfig(connector, cb);
        return this.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward);
    }

    @GET
    @Path(value="/{connector}/tasks-config")
    public Map<ConnectorTaskId, Map<String, String>> getTasksConfig(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new FutureCallback<Map<ConnectorTaskId, Map<String, String>>>();
        this.herder.tasksConfig(connector, cb);
        return this.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks-config", "GET", headers, null, forward);
    }

    @GET
    @Path(value="/{connector}/status")
    public ConnectorStateInfo getConnectorStatus(@PathParam(value="connector") String connector) {
        return this.herder.connectorStatus(connector);
    }

    @GET
    @Path(value="/{connector}/topics")
    public Response getConnectorActiveTopics(@PathParam(value="connector") String connector) {
        if (this.isTopicTrackingDisabled) {
            throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "Topic tracking is disabled.");
        }
        ActiveTopicsInfo info = this.herder.connectorActiveTopics(connector);
        return Response.ok(Collections.singletonMap(info.connector(), info)).build();
    }

    @PUT
    @Path(value="/{connector}/topics/reset")
    public Response resetConnectorActiveTopics(@PathParam(value="connector") String connector, @Context HttpHeaders headers) {
        if (this.isTopicTrackingDisabled) {
            throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "Topic tracking is disabled.");
        }
        if (this.isTopicTrackingResetDisabled) {
            throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "Topic tracking reset is disabled.");
        }
        this.herder.resetConnectorActiveTopics(connector);
        return Response.accepted().build();
    }

    @PUT
    @Path(value="/{connector}/config")
    public Response putConnectorConfig(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @QueryParam(value="forward") Boolean forward, Map<String, String> connectorConfig) throws Throwable {
        Response.ResponseBuilder response;
        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<Herder.Created<ConnectorInfo>>();
        this.checkAndPutConnectorConfigName(connector, connectorConfig);
        this.herder.putConnectorConfig(connector, connectorConfig, true, cb);
        Herder.Created<ConnectorInfo> createdInfo = this.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "PUT", headers, connectorConfig, new TypeReference<ConnectorInfo>(){}, new CreatedConnectorInfoTranslator(), forward);
        if (createdInfo.created()) {
            URI location = UriBuilder.fromUri("/connectors").path(connector).build(new Object[0]);
            response = Response.created(location);
        } else {
            response = Response.ok();
        }
        return response.entity(createdInfo.result()).build();
    }

    @POST
    @Path(value="/{connector}/restart")
    public Response restartConnector(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @DefaultValue(value="false") @QueryParam(value="includeTasks") Boolean includeTasks, @DefaultValue(value="false") @QueryParam(value="onlyFailed") Boolean onlyFailed, @QueryParam(value="forward") Boolean forward) throws Throwable {
        RestartRequest restartRequest = new RestartRequest(connector, onlyFailed, includeTasks);
        String forwardingPath = "/connectors/" + connector + "/restart";
        if (restartRequest.forceRestartConnectorOnly()) {
            FutureCallback<Void> cb = new FutureCallback<Void>();
            this.herder.restartConnector(connector, cb);
            this.completeOrForwardRequest(cb, forwardingPath, "POST", headers, null, forward);
            return Response.noContent().build();
        }
        FutureCallback<ConnectorStateInfo> cb = new FutureCallback<ConnectorStateInfo>();
        this.herder.restartConnectorAndTasks(restartRequest, cb);
        HashMap<String, String> queryParameters = new HashMap<String, String>();
        queryParameters.put("includeTasks", includeTasks.toString());
        queryParameters.put("onlyFailed", onlyFailed.toString());
        ConnectorStateInfo stateInfo = this.completeOrForwardRequest(cb, forwardingPath, "POST", headers, queryParameters, null, new TypeReference<ConnectorStateInfo>(){}, new IdentityTranslator(), forward);
        return Response.accepted().entity(stateInfo).build();
    }

    @PUT
    @Path(value="/{connector}/pause")
    public Response pauseConnector(@PathParam(value="connector") String connector, @Context HttpHeaders headers) {
        this.herder.pauseConnector(connector);
        return Response.accepted().build();
    }

    @PUT
    @Path(value="/{connector}/resume")
    public Response resumeConnector(@PathParam(value="connector") String connector) {
        this.herder.resumeConnector(connector);
        return Response.accepted().build();
    }

    @GET
    @Path(value="/{connector}/tasks")
    public List<TaskInfo> getTaskConfigs(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<List<TaskInfo>> cb = new FutureCallback<List<TaskInfo>>();
        this.herder.taskConfigs(connector, cb);
        return this.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>(){}, forward);
    }

    @POST
    @Path(value="/{connector}/tasks")
    public void putTaskConfigs(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @QueryParam(value="forward") Boolean forward, byte[] requestBody) throws Throwable {
        List<Map<String, String>> taskConfigs = new ObjectMapper().readValue(requestBody, TASK_CONFIGS_TYPE);
        FutureCallback<Void> cb = new FutureCallback<Void>();
        this.herder.putTaskConfigs(connector, taskConfigs, cb, InternalRequestSignature.fromHeaders(requestBody, headers));
        this.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward);
    }

    @GET
    @Path(value="/{connector}/tasks/{task}/status")
    public ConnectorStateInfo.TaskState getTaskStatus(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @PathParam(value="task") Integer task) {
        return this.herder.taskStatus(new ConnectorTaskId(connector, task));
    }

    @POST
    @Path(value="/{connector}/tasks/{task}/restart")
    public void restartTask(@PathParam(value="connector") String connector, @PathParam(value="task") Integer task, @Context HttpHeaders headers, @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<Void> cb = new FutureCallback<Void>();
        ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
        this.herder.restartTask(taskId, cb);
        this.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", headers, null, forward);
    }

    @DELETE
    @Path(value="/{connector}")
    public void destroyConnector(@PathParam(value="connector") String connector, @Context HttpHeaders headers, @QueryParam(value="forward") Boolean forward) throws Throwable {
        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<Herder.Created<ConnectorInfo>>();
        this.herder.deleteConnectorConfig(connector, cb);
        this.completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
    }

    private void checkAndPutConnectorConfigName(String connectorName, Map<String, String> connectorConfig) {
        String includedName = connectorConfig.get("name");
        if (includedName != null) {
            if (!includedName.equals(connectorName)) {
                throw new BadRequestException("Connector name configuration (" + includedName + ") doesn't match connector name in the URL (" + connectorName + ")");
            }
        } else {
            connectorConfig.put("name", connectorName);
        }
    }

    private <T, U> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Map<String, String> queryParameters, Object body, TypeReference<U> resultType, Translator<T, U> translator, Boolean forward) throws Throwable {
        try {
            return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof RequestTargetException) {
                if (forward == null || forward.booleanValue()) {
                    boolean recursiveForward = forward == null;
                    RequestTargetException targetException = (RequestTargetException)cause;
                    String forwardedUrl = targetException.forwardUrl();
                    if (forwardedUrl == null) {
                        throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request momentarily due to no known leader URL, likely because a rebalance was underway.");
                    }
                    UriBuilder uriBuilder = UriBuilder.fromUri(forwardedUrl).path(path).queryParam("forward", recursiveForward);
                    if (queryParameters != null) {
                        queryParameters.forEach((k, v) -> uriBuilder.queryParam((String)k, v));
                    }
                    String forwardUrl = uriBuilder.build(new Object[0]).toString();
                    log.debug("Forwarding request {} {} {}", forwardUrl, method, body);
                    return translator.translate(RestClient.httpRequest(forwardUrl, method, headers, body, resultType, this.config));
                }
                throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request because of a conflicting operation (e.g. worker rebalance)");
            }
            if (cause instanceof RebalanceNeededException) {
                throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)");
            }
            throw cause;
        }
        catch (TimeoutException e) {
            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
        }
        catch (InterruptedException e) {
            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
        }
    }

    private <T, U> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body, TypeReference<U> resultType, Translator<T, U> translator, Boolean forward) throws Throwable {
        return this.completeOrForwardRequest(cb, path, method, headers, null, body, resultType, translator, forward);
    }

    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body, TypeReference<T> resultType, Boolean forward) throws Throwable {
        return this.completeOrForwardRequest(cb, path, method, headers, body, resultType, new IdentityTranslator(), forward);
    }

    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body, Boolean forward) throws Throwable {
        return this.completeOrForwardRequest(cb, path, method, headers, body, null, new IdentityTranslator(), forward);
    }

    private static class CreatedConnectorInfoTranslator
    implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
        private CreatedConnectorInfoTranslator() {
        }

        @Override
        public Herder.Created<ConnectorInfo> translate(RestClient.HttpResponse<ConnectorInfo> response) {
            boolean created = response.status() == 201;
            return new Herder.Created<ConnectorInfo>(created, response.body());
        }
    }

    private static class IdentityTranslator<T>
    implements Translator<T, T> {
        private IdentityTranslator() {
        }

        @Override
        public T translate(RestClient.HttpResponse<T> response) {
            return response.body();
        }
    }

    private static interface Translator<T, U> {
        public T translate(RestClient.HttpResponse<U> var1);
    }
}

