/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rx.java.test;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.lang.rx.test.ReadStreamAdapterBackPressureTest;
import io.vertx.lang.rx.test.TestSubscriber;
import io.vertx.rx.java.ObservableReadStream;
import io.vertx.rx.java.RxHelper;
import io.vertx.rx.java.test.TestUtils;
import io.vertx.test.fakestream.FakeStream;
import java.util.function.Function;
import org.junit.Test;
import rx.Observable;

public class ObservableReadStreamAdapterBackPressureTest
extends ReadStreamAdapterBackPressureTest<Observable<Buffer>> {
    protected long defaultMaxBufferSize() {
        return 256L;
    }

    protected Observable<Buffer> toObservable(ReadStream<Buffer> stream, int maxBufferSize) {
        return RxHelper.toObservable(stream, (int)maxBufferSize);
    }

    protected Observable<Buffer> toObservable(ReadStream<Buffer> stream) {
        return RxHelper.toObservable(stream);
    }

    protected void subscribe(Observable<Buffer> obs, TestSubscriber<Buffer> sub) {
        TestUtils.subscribe(obs, sub);
    }

    protected Observable<Buffer> concat(Observable<Buffer> obs1, Observable<Buffer> obs2) {
        return Observable.concat(obs1, obs2);
    }

    protected Observable<Buffer> flatMap(Observable<Buffer> obs, Function<Buffer, Observable<Buffer>> f) {
        return obs.flatMap(f::apply);
    }

    @Test
    public void testDisableBackPressure() {
        FakeStream stream = new FakeStream();
        ObservableReadStream adapter = new ObservableReadStream((ReadStream)stream, Function.identity());
        Observable observable = Observable.create((Observable.OnSubscribe)adapter);
        TestSubscriber subscriber = new TestSubscriber();
        TestUtils.subscribe(observable, subscriber);
        this.assertEquals(Long.MAX_VALUE, adapter.getRequested());
        stream.emit((Object)this.buffer("0"));
        this.assertEquals(Long.MAX_VALUE, adapter.getRequested());
    }

    @Test
    public void testImplicitBackPressureActivation() {
        FakeStream stream = new FakeStream();
        ObservableReadStream adapter = new ObservableReadStream((ReadStream)stream, Function.identity());
        Observable observable = Observable.create((Observable.OnSubscribe)adapter);
        TestSubscriber subscriber = new TestSubscriber<Buffer>(){

            public void onNext(Buffer o) {
                super.onNext((Object)o);
                this.request(2L);
            }
        }.prefetch(0x7FFFFFFFFFFFFFFEL);
        TestUtils.subscribe(observable, subscriber);
        this.assertEquals(0x7FFFFFFFFFFFFFFEL, adapter.getRequested());
        stream.emit((Object)this.buffer("0"));
        this.assertEquals(Long.MAX_VALUE, adapter.getRequested());
    }
}

