Reactor:扩展一个 ParallelFlux

我有一组需要扩展的项目,所以我选择 reactor 是因为它的反应能力,因为扩展需要 IO 操作。


这是一段工作代码:


public Flux<Item> expand(List<Item> unprocessedItems) {

  return Flux.fromIterable(unprocessedItems)

    .expandDeep(this::expandItem);

}

请注意,这this::expandItem是一个阻塞操作(多个数据库查询,一些计算,...)。现在我希望这个扩展是平行的,但据我所知.expand(),.expandDeep()并且只是班级的成员,Flux而不是ParallelFlux班级的成员。我尝试在通话之前添加.publishOn()and ,但没有运气。.subscribeOn().expand()


这是我第一次使用反应器,但我没有看到任何阻止并行扩展的技术问题,有什么办法吗?API是否丢失或我错过了什么?


摇曳的蔷薇
浏览 206回答 2
2回答

狐的传说

是的,你是对的ParallelFluxhas not .expand()and .expandDeep()methods,但我可以使用其他方式,创建具有扩展方法的附加 Publisher 并将其传递给你的ParallelFlux,如下所示:public static void main(String[] args) {&nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; Function<Node, Flux<Node>> expander =&nbsp; &nbsp; &nbsp; &nbsp; node -> Flux.fromIterable(node.children);&nbsp; &nbsp; List<Node> roots = createTestNodes();&nbsp; &nbsp; Flux.fromIterable(roots)&nbsp; &nbsp; &nbsp; &nbsp; .parallel(4)&nbsp; &nbsp; &nbsp; &nbsp; .runOn(Schedulers.parallel())&nbsp; &nbsp; &nbsp; &nbsp; .flatMap(node -> Flux.just(node).expandDeep(expander))&nbsp; &nbsp; &nbsp; &nbsp; .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))&nbsp; &nbsp; &nbsp; &nbsp; .sequential()&nbsp; &nbsp; &nbsp; &nbsp; .subscribe();&nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; Thread.sleep(500);&nbsp; &nbsp; } catch (InterruptedException e) {&nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();&nbsp; &nbsp; }&nbsp; &nbsp; System.out.println("finished");}我的测试数据:static final class Node {&nbsp; &nbsp; final String name;&nbsp; &nbsp; final List<Node> children;&nbsp; &nbsp; Node(String name, Node... nodes) {&nbsp; &nbsp; &nbsp; &nbsp; this.name = name;&nbsp; &nbsp; &nbsp; &nbsp; this.children = new ArrayList<>();&nbsp; &nbsp; &nbsp; &nbsp; children.addAll(Arrays.asList(nodes));&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public String toString() {&nbsp; &nbsp; &nbsp; &nbsp; return name;&nbsp; &nbsp; }}static List<Node> createTestNodes() {&nbsp; &nbsp; return new Node("root",&nbsp; &nbsp; &nbsp; &nbsp; new Node("1",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("11")&nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; new Node("2",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("21"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("22",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("221")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; new Node("3",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("31"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("32",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("321")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("33",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("331"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("332",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("3321")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; new Node("4",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("41"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("42",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("421")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("43",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("431"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("432",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("4321")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("44",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("441"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("442",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("4421")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("443",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("4431"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("4432")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; ).children;}结果:Time: 1549296674522 thread: parallel-4 value: 4Time: 1549296674523 thread: parallel-4 value: 41Time: 1549296674523 thread: parallel-2 value: 2Time: 1549296674523 thread: parallel-2 value: 21Time: 1549296674523 thread: parallel-3 value: 3Time: 1549296674523 thread: parallel-3 value: 31Time: 1549296674523 thread: parallel-1 value: 1Time: 1549296674523 thread: parallel-1 value: 11Time: 1549296674525 thread: parallel-2 value: 22Time: 1549296674525 thread: parallel-2 value: 221Time: 1549296674526 thread: parallel-3 value: 32Time: 1549296674526 thread: parallel-3 value: 321Time: 1549296674526 thread: parallel-3 value: 33Time: 1549296674526 thread: parallel-3 value: 331Time: 1549296674526 thread: parallel-3 value: 332Time: 1549296674526 thread: parallel-3 value: 3321Time: 1549296674526 thread: parallel-4 value: 42Time: 1549296674526 thread: parallel-4 value: 421Time: 1549296674526 thread: parallel-4 value: 43Time: 1549296674526 thread: parallel-4 value: 431Time: 1549296674526 thread: parallel-4 value: 432Time: 1549296674526 thread: parallel-4 value: 4321Time: 1549296674527 thread: parallel-4 value: 44Time: 1549296674527 thread: parallel-4 value: 441Time: 1549296674527 thread: parallel-4 value: 442Time: 1549296674527 thread: parallel-4 value: 4421Time: 1549296674528 thread: parallel-4 value: 443Time: 1549296674528 thread: parallel-4 value: 4431Time: 1549296674528 thread: parallel-4 value: 4432如您所见expander,在并行线程中工作。

ibeautiful

这是一个示例,基于YauhenBalykin给出的示例:public static void main(String[] args) {&nbsp; &nbsp; Function<Node, Flux<Node>> expander =&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; node -> Flux.fromIterable(node.children)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .subscribeOn(Schedulers.parallel());&nbsp; &nbsp; List<Node> roots = createTestNodes();&nbsp; &nbsp; Flux.fromIterable(roots)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .expand(expander)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .subscribe();&nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; Thread.sleep(500);&nbsp; &nbsp; } catch (InterruptedException e) {&nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();&nbsp; &nbsp; }&nbsp; &nbsp; System.out.println("finished");}测试数据:static final class Node {&nbsp; &nbsp; final String name;&nbsp; &nbsp; final List<Node> children;&nbsp; &nbsp; Node(String name, Node... nodes) {&nbsp; &nbsp; &nbsp; &nbsp; this.name = name;&nbsp; &nbsp; &nbsp; &nbsp; this.children = new ArrayList<>();&nbsp; &nbsp; &nbsp; &nbsp; children.addAll(Arrays.asList(nodes));&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public String toString() {&nbsp; &nbsp; &nbsp; &nbsp; return name;&nbsp; &nbsp; }}static List<Node> createTestNodes() {&nbsp; &nbsp; return new Node("root",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("1",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("11")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("2",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("21"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("22",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("221")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("3",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("31"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("32",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("321")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("33",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("331"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("332",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("3321")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("4",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("41"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("42",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("421")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("43",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("431"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("432",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("4321")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("44",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("441"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("442",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("4421")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("443",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("4431"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Node("4432")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; ).children;}结果:Time: 1636182895717 thread: main value: 1Time: 1636182895754 thread: main value: 2Time: 1636182895754 thread: main value: 3Time: 1636182895754 thread: main value: 4Time: 1636182895761 thread: parallel-1 value: 11Time: 1636182895761 thread: parallel-2 value: 21Time: 1636182895761 thread: parallel-2 value: 22Time: 1636182895762 thread: parallel-3 value: 31Time: 1636182895762 thread: parallel-3 value: 32Time: 1636182895762 thread: parallel-3 value: 33Time: 1636182895762 thread: parallel-4 value: 41Time: 1636182895762 thread: parallel-4 value: 42Time: 1636182895762 thread: parallel-4 value: 43Time: 1636182895762 thread: parallel-4 value: 44Time: 1636182895764 thread: parallel-7 value: 221Time: 1636182895764 thread: parallel-9 value: 321Time: 1636182895764 thread: parallel-10 value: 331Time: 1636182895765 thread: parallel-10 value: 332Time: 1636182895765 thread: parallel-12 value: 421Time: 1636182895765 thread: parallel-1 value: 431Time: 1636182895765 thread: parallel-1 value: 432Time: 1636182895766 thread: parallel-2 value: 441Time: 1636182895766 thread: parallel-2 value: 442Time: 1636182895766 thread: parallel-2 value: 443Time: 1636182895766 thread: parallel-6 value: 3321Time: 1636182895767 thread: parallel-9 value: 4321Time: 1636182895767 thread: parallel-11 value: 4421Time: 1636182895767 thread: parallel-12 value: 4431Time: 1636182895767 thread: parallel-12 value: 4432finished
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java