为什么 toMap 的错误放置会导致 RXJava 出现问题?

更新:我正在使用 RxJava 1.x


这是以下代码段:


private static void tryObservableToMap() {

    bad();

    good();

}


private static void good() {

    System.out.println("GOOD CASE");

    String goodOutput =

            m(m(m(m(m(Observable.from(ImmutableList.of("a","b","c","d")), "list")

            .distinct(), "distinct")

            .flatMap(s ->

                    m(m(Observable.fromCallable(() -> getIntForString(s)).subscribeOn(Schedulers.io()), "getInt " + s)

                            .map(intValue -> Pair.of(s, intValue)), "pair " + s)), "flatMap")

            .toMap(Pair::getKey, Pair::getValue), "toMap")

            .map(map -> map.entrySet().stream().map(e -> e.getKey() + ": " + e.getValue()).collect(Collectors.joining("\n"))), "OUTER")

            .toBlocking()

            .first();



    try {

        Thread.sleep(2000);

    } catch (InterruptedException e) {

        e.printStackTrace();

    }


    System.out.println("\nOutput:");

    System.out.println(goodOutput);

}


private static void bad() {

    System.out.println("BAD CASE");

    String badOutput =

            m(m(m(m(Observable.from(ImmutableList.of("a","b","c","d")), "list")

            .distinct(), "distinct")

            .flatMap(s ->

                    m(m(m(Observable.fromCallable(() -> getIntForString(s)).subscribeOn(Schedulers.io()), "getInt " + s)

                            .map(intValue -> Pair.of(s, intValue)), "pair " + s)

                            .toMap(Pair::getKey, Pair::getValue), "toMap " + s)), "flatMap")

            .map(map -> map.entrySet().stream().map(e -> e.getKey() + ": " + e.getValue()).collect(Collectors.joining("\n"))), "OUTER")

            .toBlocking()

            .first();



    try {

        Thread.sleep(2000);

    } catch (InterruptedException e) {

        e.printStackTrace();

    }


    System.out.println("\nOutput:");

    System.out.println(badOutput);

}



好与坏的区别在于,对于坏版本,我是.toMap在内部调用.flatMap而不是在.flatMap.


如果您运行此代码,您将看到作为执行一部分的所有 observable 的不同事件。


我想知道为什么“外部”可观察对象永远不会因坏情况而终止。对RX有更深了解的人可以解释一下吗?


森栏
浏览 113回答 1
1回答

慕田峪7331174

该RXLOG OUTER observable complete丢失是因为有之间的竞争toBlocking().first()和它上面完成的来源。它可能会过早取消订阅,因此上面的源可能没有机会发出onCompleted. 在我的 i7 4770K 上,他们从不completed为我打印。如果您替换first为toIterable().iterator().next(),它将提供必要的机会,您应该始终看到丢失的日志。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java