RxJava 是否适合分支工作流?

我正在使用 RxJava 来处理我们从队列中提取的一些通知。

RxJava 似乎在一个简单的工作流中工作得很好,现在随着新需求的出现,流程变得越来越复杂,分支越来越多(请参见下图作为参考) 

http://img4.mukewang.com/60b73a6e0001879c19110480.jpg

我试图用一个小单元测试来举例说明流程:


@Test

public void test() {

    Observable.range(1, 100)

        .groupBy(n -> n % 3)

        .toMap(GroupedObservable::getKey)

        .flatMap(m1 -> {

            Observable<Integer> ones1 = m1.get(0);

            Observable<Integer> twos1 = m1.get(1).map(n -> n - 10);

            Observable<Integer> threes = m1.get(2).map(n -> n + 100);

            Observable<Integer> onesAndTwos = Observable.merge(ones1, twos1)

                .map(n -> n * 3)

                .groupBy(n -> n % 2)

                .toMap(GroupedObservable::getKey)

                .flatMap(m2 -> {

                    Observable<Integer> ones2 = m2.get(0).map(n -> n * 10);

                    Observable<Integer> twos2 = m2.get(1).map(n -> n * 100);

                    return Observable.merge(ones2, twos2);

                });

                return Observable.merge(onesAndTwos, threes).map(n -> n +1);

        })

        .subscribe(System.out::println);

}

虽然使用 RxJava 在技术上仍然可以实现,但我现在想知道它是否是一个不错的选择,因为我必须在 main 中进行 2 级嵌套flatMap,这看起来不太整洁。


这是描述上述工作流程的正确方式吗?或者 RxJava 不适合分支工作流?


莫回无
浏览 317回答 2
2回答

HUX布斯

分组 observable 是 AFAIK 的正确方法。就个人而言,如果您图片中“按类型拆分”和“合并所有内容”之间的任何内容是异步的,那么在 RX 中执行此操作肯定有很多优点,例如重试逻辑、缓冲、错误处理、背压等。如果它是常规的非异步代码,我猜这是个人喜好。您可以使用 RX 来完成,但您也可以使用常规同步代码在“按类型拆分”和“合并所有内容”之间执行所有操作。无论您选择哪种方式,拆分代码以使其更具可读性始终是一个好主意,这样您就可以像阅读您附加的图像一样轻松地“阅读流程”。

牛魔王的故事

只是另一种可能适合您的方法的想法:您可以多播源并单独处理分支,而不是分组/toMap。例子:@Testpublic void multicastingShare() {&nbsp; &nbsp; final Observable<Integer> sharedSource = Observable.range(1, 10)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .doOnSubscribe(dummy -> System.out.println("subscribed"))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .share();&nbsp; &nbsp; // split by some criteria&nbsp; &nbsp; final Observable<String> oddItems = sharedSource&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .filter(n -> n % 2 == 1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(odd -> "odd: " + odd)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .doOnNext(System.out::println);&nbsp; &nbsp; final Observable<String> evenItems = sharedSource&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .filter(n -> n % 2 == 0)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(even -> "even: " + even)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .doOnNext(System.out::println);&nbsp; &nbsp; // recombine the individual streams at some point&nbsp; &nbsp; Observable.concat(oddItems, evenItems)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .subscribe(result -> System.out.println("result: " + result));}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java