/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rx.java.test;

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.streams.WriteStream;
import io.vertx.lang.rx.test.FakeWriteStream;
import io.vertx.rx.java.RxHelper;
import io.vertx.rx.java.WriteStreamSubscriber;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.VertxTestBase;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Test;
import rx.Emitter;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class WriteStreamSubscriberTest
extends VertxTestBase {
    @Test
    public void testObservableErrorReported() throws Exception {
        Exception expected = new Exception();
        FakeWriteStream writeStream = new FakeWriteStream(this.vertx);
        WriteStreamSubscriber subscriber = RxHelper.toSubscriber((WriteStream)writeStream).onError(throwable -> {
            this.assertThat(throwable, CoreMatchers.is((Matcher)CoreMatchers.sameInstance((Object)expected)));
            this.complete();
        });
        Observable.error((Throwable)expected).observeOn(RxHelper.scheduler((Vertx)this.vertx)).subscribeOn(RxHelper.scheduler((Vertx)this.vertx)).subscribe((Subscriber)subscriber);
        this.await();
        this.assertFalse("Did not expect writeStream end method to be invoked", writeStream.endInvoked());
    }

    @Test
    @Repeat(times=10)
    public void testObservableToWriteStreamVertxThread() throws Exception {
        this.testObservableToWriteStream(RxHelper.scheduler((Context)this.vertx.getOrCreateContext()));
    }

    @Test
    @Repeat(times=10)
    public void testObservableToWriteStreamNonVertxThread() throws Exception {
        this.testObservableToWriteStream(Schedulers.from((Executor)Executors.newFixedThreadPool(5)));
    }

    private void testObservableToWriteStream(Scheduler scheduler) throws Exception {
        this.disableThreadChecks();
        FakeWriteStream writeStream = new FakeWriteStream(this.vertx);
        WriteStreamSubscriber subscriber = RxHelper.toSubscriber((WriteStream)writeStream).onWriteStreamEnd(() -> ((WriteStreamSubscriberTest)this).complete());
        int count = 10000;
        Observable.range((int)0, (int)count).observeOn(scheduler).subscribeOn(scheduler).subscribe((Subscriber)subscriber);
        this.await();
        this.assertTrue("Expected drainHandler to be invoked", writeStream.drainHandlerInvoked());
        this.assertEquals(count, writeStream.getCount());
        this.assertTrue("Expected writeStream end method to be invoked", writeStream.endInvoked());
    }

    @Test
    public void testWriteStreamError() throws Exception {
        this.testWriteStreamError(false);
    }

    @Test
    public void testWriteStreamErrorAfterComplete() throws Exception {
        this.testWriteStreamError(true);
    }

    private void testWriteStreamError(boolean complete) {
        this.waitFor(2);
        RuntimeException expected = new RuntimeException();
        FakeWriteStream writeStream = new FakeWriteStream(this.vertx).failAfterWrite((Throwable)expected);
        WriteStreamSubscriber subscriber = RxHelper.toSubscriber((WriteStream)writeStream).onWriteStreamError(throwable -> {
            this.assertThat(throwable, CoreMatchers.is((Matcher)CoreMatchers.sameInstance((Object)expected)));
            this.complete();
        });
        Observable.create(emitter -> {
            emitter.setCancellation(() -> ((WriteStreamSubscriberTest)this).complete());
            emitter.onNext((Object)0);
            if (complete) {
                emitter.onCompleted();
            }
        }, (Emitter.BackpressureMode)Emitter.BackpressureMode.NONE).observeOn(RxHelper.scheduler((Vertx)this.vertx)).subscribeOn(RxHelper.scheduler((Vertx)this.vertx)).subscribe((Subscriber)subscriber);
        this.await();
        this.assertEquals(complete, writeStream.endInvoked());
    }
}

