/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.reactivex.test;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableSource;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.rxjava3.core.AbstractVerticle;
import io.vertx.rxjava3.core.RxHelper;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.eventbus.EventBus;
import io.vertx.rxjava3.core.http.HttpClient;
import io.vertx.rxjava3.core.http.HttpClientRequest;
import io.vertx.rxjava3.core.http.HttpClientResponse;
import io.vertx.rxjava3.core.http.HttpServerResponse;
import io.vertx.rxjava3.core.http.WebSocket;
import io.vertx.rxjava3.core.parsetools.RecordParser;
import io.vertx.test.core.VertxTestBase;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;

public class CoreApiTest
extends VertxTestBase {
    private Vertx vertx;

    public void setUp() throws Exception {
        super.setUp();
        this.vertx = new Vertx(((VertxTestBase)this).vertx);
    }

    @Test
    public void testDeployVerticle() throws Exception {
        final CountDownLatch deployLatch = new CountDownLatch(2);
        RxHelper.deployVerticle((Vertx)this.vertx, (Verticle)new AbstractVerticle(){

            public void start() {
                deployLatch.countDown();
            }
        }).subscribe(resp -> deployLatch.countDown());
        this.awaitLatch(deployLatch);
    }

    @Test
    public void testWebSocket() {
        this.waitFor(2);
        AtomicLong serverReceived = new AtomicLong();
        this.vertx.createHttpServer(new HttpServerOptions().setIdleTimeout(2)).webSocketHandler(ws -> ws.toFlowable().subscribe(msg -> {
            serverReceived.incrementAndGet();
            ws.writeTextMessage("pong");
        }, err -> {
            this.assertEquals(1L, serverReceived.get());
            this.complete();
        }, () -> ((CoreApiTest)this).fail())).listen(8080, "localhost").blockingGet();
        HttpClient client = this.vertx.createHttpClient();
        AtomicLong clientReceived = new AtomicLong();
        client.rxWebSocket(8080, "localhost", "/").doAfterSuccess(ws -> ws.writeTextMessage("ping")).flatMapPublisher(WebSocket::toFlowable).subscribe(msg -> clientReceived.incrementAndGet(), err -> {
            this.assertEquals(1L, clientReceived.get());
            this.complete();
        }, () -> ((CoreApiTest)this).fail());
        this.await();
    }

    @Test
    public void testHttpClient() {
        this.vertx.createHttpServer().requestHandler(req -> req.response().end("Hello World")).listen(8080, "localhost").blockingGet();
        HttpClient client = this.vertx.createHttpClient();
        Buffer result = (Buffer)client.rxRequest(HttpMethod.GET, 8080, "localhost", "/").flatMap(request -> request.rxSend().flatMap(HttpClientResponse::body)).blockingGet();
        this.assertEquals("Hello World", result.toString());
    }

    @Test
    public void testHttpClientResponseStream() {
        this.vertx.createHttpServer().requestHandler(req -> {
            AtomicInteger counter = new AtomicInteger();
            HttpServerResponse response = req.response().setChunked(true);
            this.vertx.setPeriodic(10L, id -> {
                int cnt = counter.getAndIncrement();
                if (cnt < 10) {
                    response.write("" + cnt);
                } else {
                    this.vertx.cancelTimer(id.longValue());
                    response.end();
                }
            });
        }).listen(8080, "localhost").blockingGet();
        HttpClient client = this.vertx.createHttpClient();
        String result = (String)client.rxRequest(HttpMethod.GET, 8080, "localhost", "/").flatMapPublisher(request -> request.rxSend().flatMapPublisher(HttpClientResponse::toFlowable)).reduce((Object)"", (s, b) -> s + b).blockingGet();
        this.assertEquals("0123456789", result);
    }

    @Test
    public void shouldRemoveInterceptor() {
        String headerName = UUID.randomUUID().toString();
        String headerValue = UUID.randomUUID().toString();
        Handler interceptor = dc -> {
            dc.message().headers().add(headerName, headerValue);
            dc.next();
        };
        EventBus eventBus = this.vertx.eventBus();
        eventBus.addInboundInterceptor(interceptor);
        eventBus.consumer("foo", msg -> msg.reply((Object)msg.headers().get(headerName))).completionHandler().andThen((CompletableSource)eventBus.rxRequest("foo", (Object)"bar").flatMapCompletable(reply -> {
            if (reply.body().equals(headerValue)) {
                return Completable.complete();
            }
            return Completable.error((Throwable)new NoStackTraceThrowable("Expected msg to be intercepted"));
        })).andThen((CompletableSource)Completable.fromAction(() -> eventBus.removeInboundInterceptor(interceptor))).andThen((CompletableSource)eventBus.rxRequest("foo", (Object)"bar").flatMapCompletable(reply -> {
            if (reply.body() == null) {
                return Completable.complete();
            }
            return Completable.error((Throwable)new NoStackTraceThrowable("Expected msg not to be intercepted"));
        })).subscribe(() -> this.testComplete(), throwable -> this.fail((Throwable)throwable));
        this.await();
    }

    @Test
    public void testPipeFailureShouldUnsubscribe() throws Exception {
        this.vertx.createHttpServer().requestHandler(req -> {
            Flowable f = Flowable.generate(() -> 0L, (state, emitter) -> {
                emitter.onNext((Object)Buffer.buffer((String)("Chunk " + state + "\n")));
                return state + 1L;
            }).delay(100L, TimeUnit.MILLISECONDS).rebatchRequests(1).doOnCancel(() -> ((CoreApiTest)this).testComplete());
            req.response().send(f);
        }).rxListen(8080, "localhost").blockingGet();
        HttpClient client = this.vertx.createHttpClient();
        client.rxRequest(HttpMethod.GET, 8080, "localhost", "/").flatMap(HttpClientRequest::rxSend).subscribe(resp -> resp.toFlowable().take(5L).subscribe(buff -> {}, arg_0 -> ((CoreApiTest)this).fail(arg_0), () -> resp.request().reset()), arg_0 -> ((CoreApiTest)this).fail(arg_0));
        this.await();
    }

    @Test
    public void testRecordParser() {
        Single source = this.vertx.fileSystem().rxOpen("src/test/resources/test.txt", new OpenOptions());
        this.waitFor(5);
        source.map(file -> RecordParser.newDelimited((String)"\n", (Flowable)file.toFlowable())).flatMapObservable(RecordParser::toObservable).doOnNext(v -> this.complete()).doOnComplete(() -> this.complete()).ignoreElements().subscribe(() -> this.complete(), arg_0 -> ((CoreApiTest)this).fail(arg_0));
        this.await();
    }
}

