我目前正在使用 Rx 1。
我有以下测试用例:
static void printThread(String format, Object... objects) {
System.out.println(String.format("%s %s", Thread.currentThread().getName(),
String.format(format, objects)));
}
public void testFoo() throws InterruptedException {
Observable.fromCallable(() -> { printThread("callable"); return 1L;})
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(() -> printThread("A"))
.doOnSubscribe(() -> printThread("B"))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(() -> printThread("C"))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(() -> printThread("D"))
.toBlocking()
.subscribe();
printThread("next!");
Completable.fromCallable(() -> { printThread("callable"); Thread.sleep(10_000); return 1L;})
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(a -> printThread("A"))
.doOnSubscribe(a -> printThread("B"))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(a -> printThread("C"))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(a -> printThread("D"))
.andThen(Completable.fromAction(() -> printThread("E")))
.andThen(Completable.fromAction(() -> printThread("F")).subscribeOn(Schedulers.newThread()))
.await();
}
产生以下输出:
main D
RxNewThreadScheduler-1 C
RxNewThreadScheduler-2 B
RxNewThreadScheduler-2 A
RxNewThreadScheduler-3 callable
main next!
RxNewThreadScheduler-6 A
RxNewThreadScheduler-6 B
RxNewThreadScheduler-6 C
RxNewThreadScheduler-6 D
RxNewThreadScheduler-6 callable
RxNewThreadScheduler-6 E
RxNewThreadScheduler-7 F
Process finished with exit code 0
为什么订阅时间副作用的调度在Observable和之间的工作方式存在差异Completable?
我认为发生的事情是对于可观察的行为是产生的,因为订阅发生在默认的单线程调度模式下,除了 wheresubscribeOn()被调用,这就是为什么 A 和 B 发生在同一个线程上,而其他一切都发生在不同的线程上。
但我不明白为什么这种行为会被 Completable 改变。
杨魅力
相关分类