/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rx.java.test;

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.rx.java.ContextScheduler;
import io.vertx.test.core.VertxTestBase;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Test;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSchedulersHook;

public class SchedulerTest
extends VertxTestBase {
    @After
    public void after() throws Exception {
        Method meth = RxJavaPlugins.class.getDeclaredMethod("reset", new Class[0]);
        meth.setAccessible(true);
        meth.invoke((Object)RxJavaPlugins.getInstance(), new Object[0]);
    }

    private void assertEventLoopThread() {
        String name = Thread.currentThread().getName();
        this.assertTrue("Was expecting event loop thread instead of " + name, name.startsWith("vert.x-eventloop-thread"));
    }

    private void assertWorkerThread() {
        String name = Thread.currentThread().getName();
        this.assertTrue("Was expecting worker thread instead of " + name, name.startsWith("vert.x-worker-thread"));
    }

    @Test
    public void testScheduleImmediatly() {
        this.testScheduleImmediatly(true);
    }

    @Test
    public void testScheduleImmediatlyBlocking() {
        this.testScheduleImmediatly(false);
    }

    private void testScheduleImmediatly(boolean blocking) {
        ContextScheduler scheduler = new ContextScheduler(this.vertx, blocking);
        Scheduler.Worker worker = scheduler.createWorker();
        worker.schedule(() -> {
            if (blocking) {
                this.assertWorkerThread();
            } else {
                this.assertEventLoopThread();
            }
            this.testComplete();
        }, 0L, TimeUnit.MILLISECONDS);
        this.await();
    }

    @Test
    public void testScheduleObserveOnReturnsOnTheCorrectThread() {
        Context testContext = this.vertx.getOrCreateContext();
        testContext.runOnContext(v -> {
            ContextScheduler scheduler = new ContextScheduler(testContext, false);
            Observable observable = Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<String>(){

                public void call(Subscriber<? super String> subscriber) {
                    SchedulerTest.this.assertFalse(Context.isOnVertxThread());
                    subscriber.onNext((Object)"expected");
                    subscriber.onCompleted();
                }
            }).observeOn((Scheduler)scheduler).doOnNext(o -> this.assertEquals(Vertx.currentContext(), testContext));
            new Thread(() -> observable.subscribe(item -> this.assertEquals("expected", item), arg_0 -> ((SchedulerTest)this).fail(arg_0), () -> ((SchedulerTest)this).testComplete())).start();
        });
        this.await();
    }

    @Test
    public void testScheduleWithDelayObserveOnReturnsOnTheCorrectThread() {
        Context testContext = this.vertx.getOrCreateContext();
        testContext.runOnContext(v -> {
            ContextScheduler scheduler = new ContextScheduler(testContext, false);
            Observable observable = Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<String>(){

                public void call(Subscriber<? super String> subscriber) {
                    SchedulerTest.this.assertFalse(Context.isOnVertxThread());
                    subscriber.onNext((Object)"expected");
                    subscriber.onCompleted();
                }
            }).delay(10L, TimeUnit.MILLISECONDS, (Scheduler)scheduler).doOnNext(o -> this.assertEquals(Vertx.currentContext(), testContext));
            new Thread(() -> observable.subscribe(item -> this.assertEquals("expected", item), arg_0 -> ((SchedulerTest)this).fail(arg_0), () -> ((SchedulerTest)this).testComplete())).start();
        });
        this.await();
    }

    @Test
    public void testScheduleDelayed() {
        this.testScheduleDelayed(false);
    }

    @Test
    public void testScheduleDelayedBlocking() {
        this.testScheduleDelayed(true);
    }

    private void testScheduleDelayed(boolean blocking) {
        ContextScheduler scheduler2 = new ContextScheduler(this.vertx, blocking);
        Scheduler.Worker worker = scheduler2.createWorker();
        long time = System.currentTimeMillis();
        worker.schedule(() -> {
            if (blocking) {
                this.assertWorkerThread();
            } else {
                this.assertEventLoopThread();
            }
            this.assertTrue(System.currentTimeMillis() - time >= 40L);
            this.testComplete();
        }, 40L, TimeUnit.MILLISECONDS);
        this.await();
    }

    @Test
    public void testSchedulePeriodic() {
        this.testSchedulePeriodic(false);
    }

    @Test
    public void testSchedulePeriodicBlocking() {
        this.testSchedulePeriodic(true);
    }

    private void testSchedulePeriodic(boolean blocking) {
        ContextScheduler scheduler2 = new ContextScheduler(this.vertx, blocking);
        Scheduler.Worker worker = scheduler2.createWorker();
        AtomicLong time = new AtomicLong(System.currentTimeMillis() - 40L);
        AtomicInteger count = new AtomicInteger();
        AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
        sub.set(worker.schedulePeriodically(() -> {
            if (blocking) {
                this.assertWorkerThread();
            } else {
                this.assertEventLoopThread();
            }
            if (count.incrementAndGet() > 2) {
                ((Subscription)sub.get()).unsubscribe();
                this.testComplete();
            } else {
                long now = System.currentTimeMillis();
                long delta = now - time.get();
                this.assertTrue("" + delta, delta >= 40L);
                time.set(now);
            }
        }, 0L, 40L, TimeUnit.MILLISECONDS));
        this.await();
    }

    @Test
    public void testUnsubscribeBeforeExecute() throws Exception {
        this.testUnsubscribeBeforeExecute(false);
    }

    @Test
    public void testUnsubscribeBeforeExecuteBlocking() throws Exception {
        this.testUnsubscribeBeforeExecute(true);
    }

    private void testUnsubscribeBeforeExecute(boolean blocking) throws Exception {
        ContextScheduler scheduler2 = new ContextScheduler(this.vertx, blocking);
        Scheduler.Worker worker = scheduler2.createWorker();
        CountDownLatch latch = new CountDownLatch(1);
        Subscription sub = worker.schedule(latch::countDown, 20L, TimeUnit.MILLISECONDS);
        sub.unsubscribe();
        this.assertFalse(latch.await(40L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testUnsubscribeDuringExecute() throws Exception {
        this.testUnsubscribeDuringExecute(false);
    }

    @Test
    public void testUnsubscribeDuringExecuteBlocking() throws Exception {
        this.testUnsubscribeDuringExecute(true);
    }

    private void testUnsubscribeDuringExecute(boolean blocking) throws Exception {
        ContextScheduler scheduler2 = new ContextScheduler(this.vertx, blocking);
        Scheduler.Worker worker = scheduler2.createWorker();
        AtomicInteger count = new AtomicInteger();
        AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
        sub.set(worker.schedulePeriodically(() -> {
            if (count.getAndIncrement() == 0) {
                ((Subscription)sub.get()).unsubscribe();
            }
        }, 0L, 5L, TimeUnit.MILLISECONDS));
        Thread.sleep(60L);
        this.assertEquals(1L, count.get());
    }

    @Test
    public void testUnsubscribeBetweenActions() throws Exception {
        this.testUnsubscribeBetweenActions(false);
    }

    @Test
    public void testUnsubscribeBetweenActionsBlocking() throws Exception {
        this.testUnsubscribeBetweenActions(true);
    }

    private void testUnsubscribeBetweenActions(boolean blocking) throws Exception {
        ContextScheduler scheduler2 = new ContextScheduler(this.vertx, blocking);
        Scheduler.Worker worker = scheduler2.createWorker();
        AtomicInteger count = new AtomicInteger();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
        sub.set(worker.schedulePeriodically(() -> {
            if (count.incrementAndGet() == 4) {
                latch.countDown();
            }
        }, 0L, 20L, TimeUnit.MILLISECONDS));
        this.awaitLatch(latch);
        ((Subscription)sub.get()).unsubscribe();
        Thread.sleep(60L);
        this.assertEquals(4L, count.get());
    }

    @Test
    public void testWorkerUnsubscribe() throws Exception {
        this.testWorkerUnsubscribe(false);
    }

    @Test
    public void testWorkerUnsubscribeBlocking() throws Exception {
        this.testWorkerUnsubscribe(true);
    }

    private void testWorkerUnsubscribe(boolean blocking) throws Exception {
        ContextScheduler scheduler2 = new ContextScheduler(this.vertx, blocking);
        Scheduler.Worker worker = scheduler2.createWorker();
        CountDownLatch latch = new CountDownLatch(2);
        Subscription sub1 = worker.schedule(latch::countDown, 40L, TimeUnit.MILLISECONDS);
        Subscription sub2 = worker.schedule(latch::countDown, 40L, TimeUnit.MILLISECONDS);
        worker.unsubscribe();
        this.assertTrue(sub1.isUnsubscribed());
        this.assertTrue(sub2.isUnsubscribed());
        this.assertFalse(latch.await(40L, TimeUnit.MILLISECONDS));
        this.assertEquals(2L, latch.getCount());
    }

    @Test
    public void testPeriodicRescheduleAfterActionBlocking() {
        ContextScheduler scheduler2 = new ContextScheduler(this.vertx, true);
        Scheduler.Worker worker = scheduler2.createWorker();
        AtomicBoolean b = new AtomicBoolean();
        long time = System.currentTimeMillis();
        worker.schedulePeriodically(() -> {
            if (b.compareAndSet(false, true)) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException e) {
                    this.fail();
                }
            } else {
                this.assertTrue(System.currentTimeMillis() - time > 50L);
                worker.unsubscribe();
                this.testComplete();
            }
        }, 20L, 20L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testSchedulerHook() throws Exception {
        this.testSchedulerHook(false);
    }

    @Test
    public void testSchedulerHookBlocking() throws Exception {
        this.testSchedulerHook(true);
    }

    private void testSchedulerHook(boolean blocking) throws Exception {
        RxJavaPlugins plugins = RxJavaPlugins.getInstance();
        final AtomicInteger scheduled = new AtomicInteger();
        final AtomicInteger called = new AtomicInteger();
        final CountDownLatch latchCalled = new CountDownLatch(1);
        plugins.registerSchedulersHook(new RxJavaSchedulersHook(){

            public Action0 onSchedule(Action0 action) {
                scheduled.incrementAndGet();
                return () -> {
                    action.call();
                    called.getAndIncrement();
                    latchCalled.countDown();
                };
            }
        });
        ContextScheduler scheduler = new ContextScheduler(this.vertx, blocking);
        Scheduler.Worker worker = scheduler.createWorker();
        this.assertEquals(0L, scheduled.get());
        this.assertEquals(0L, called.get());
        CountDownLatch latch = new CountDownLatch(1);
        worker.schedule(() -> {
            latch.countDown();
            this.assertEquals(1L, scheduled.get());
            this.assertEquals(0L, called.get());
        }, 0L, TimeUnit.SECONDS);
        this.awaitLatch(latch);
        this.awaitLatch(latchCalled);
        this.assertEquals(1L, scheduled.get());
        this.assertEquals(1L, called.get());
    }
}

