使用组的通量并行串行执行

假设我有这个:


Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12)

   .groupBy(i -> i % 3);

并说我有一个方法:


Mono<Integer> getFromService(Integer i);

我想为每个组并行呼叫,但请确保每个组中的呼叫是串行的。getFromService


对于上面的示例,这将是具有这些输入值的三个并行流:


stream 1: 0 -> 3 -> 6 -> 9

stream 2: 1 -> 4 -> 7 -> 10

stream 3: 2 -> 5 -> 8 -> 11

我试过这个,但它没有做我想做的事:


Flux.range(0, 12)

   .groupBy(i -> i % 3)

   .flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))

这是一次并行调用所有整数的服务。我该如何继续?


GCT1015
浏览 164回答 1
1回答

慕斯709654

使用“连接映射”或“平面映射”序列性而不是内部.flatMap如果要在每个组中按顺序执行(即每个组中一次只有一个订阅),请使用 ,如下所示:getFromService.concatMapFlux.range(0,&nbsp;12) &nbsp;&nbsp;&nbsp;.groupBy(i&nbsp;->&nbsp;i&nbsp;%&nbsp;3) &nbsp;&nbsp;&nbsp;.flatMap(g&nbsp;->&nbsp;g.concatMap(i&nbsp;->&nbsp;getFromService(g.key(),&nbsp;i)))如果组内的并行执行是可以的,但您只关心序列的发出顺序,则使用 ,如下所示:flatMapSequentialFlux.range(0,&nbsp;12) &nbsp;&nbsp;&nbsp;&nbsp;.groupBy(i&nbsp;->&nbsp;i&nbsp;%&nbsp;3) &nbsp;&nbsp;&nbsp;&nbsp;.flatMap(g&nbsp;->&nbsp;g.flatMapSequential(i&nbsp;->&nbsp;getFromService(g.key(),&nbsp;i)))另一种选择是将参数设置为 使用,但我建议使用上述方法之一。.flatMapconcurrency1
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java