为什么 Completable 和 Observable 之间的订阅时间副作用安排不同?

我目前正在使用 Rx 1。


我有以下测试用例:


static void printThread(String format, Object... objects) {

    System.out.println(String.format("%s %s", Thread.currentThread().getName(),

            String.format(format, objects)));

}


public void testFoo() throws InterruptedException {

    Observable.fromCallable(() -> { printThread("callable"); return 1L;})

              .subscribeOn(Schedulers.newThread())

              .doOnSubscribe(() -> printThread("A"))

              .doOnSubscribe(() -> printThread("B"))

              .subscribeOn(Schedulers.newThread())

              .doOnSubscribe(() -> printThread("C"))

              .subscribeOn(Schedulers.newThread())

              .doOnSubscribe(() -> printThread("D"))

              .toBlocking()

              .subscribe();


    printThread("next!");


    Completable.fromCallable(() -> { printThread("callable"); Thread.sleep(10_000); return 1L;})

              .subscribeOn(Schedulers.newThread())

              .doOnSubscribe(a -> printThread("A"))

              .doOnSubscribe(a -> printThread("B"))

              .subscribeOn(Schedulers.newThread())

              .doOnSubscribe(a -> printThread("C"))

              .subscribeOn(Schedulers.newThread())

              .doOnSubscribe(a -> printThread("D"))

               .andThen(Completable.fromAction(() -> printThread("E")))

               .andThen(Completable.fromAction(() -> printThread("F")).subscribeOn(Schedulers.newThread()))

               .await();

}

产生以下输出:


main D

RxNewThreadScheduler-1 C

RxNewThreadScheduler-2 B

RxNewThreadScheduler-2 A

RxNewThreadScheduler-3 callable

main next!

RxNewThreadScheduler-6 A

RxNewThreadScheduler-6 B

RxNewThreadScheduler-6 C

RxNewThreadScheduler-6 D

RxNewThreadScheduler-6 callable

RxNewThreadScheduler-6 E

RxNewThreadScheduler-7 F


Process finished with exit code 0

为什么订阅时间副作用的调度在Observable和之间的工作方式存在差异Completable?


我认为发生的事情是对于可观察的行为是产生的,因为订阅发生在默认的单线程调度模式下,除了 wheresubscribeOn()被调用,这就是为什么 A 和 B 发生在同一个线程上,而其他一切都发生在不同的线程上。


但我不明白为什么这种行为会被 Completable 改变。


慕盖茨4494581
浏览 209回答 1
1回答

杨魅力

RxJava 1 在这方面有点不一致。1.x在订阅时线程切换之后Completable.subscribeOn调用onSubscribe,而 withObservable则doOnSubscribe在线程切换到上游之前调用。使用 RxJava 2,它们现在是一致的:main DRxNewThreadScheduler-1 CRxNewThreadScheduler-2 ARxNewThreadScheduler-2 BRxNewThreadScheduler-3 callablemain next!main DRxNewThreadScheduler-4 CRxNewThreadScheduler-5 ARxNewThreadScheduler-5 BRxNewThreadScheduler-6 callable
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java