狐的传说
是的,你是对的ParallelFluxhas not .expand()and .expandDeep()methods,但我可以使用其他方式,创建具有扩展方法的附加 Publisher 并将其传递给你的ParallelFlux,如下所示:public static void main(String[] args) { Function<Node, Flux<Node>> expander = node -> Flux.fromIterable(node.children); List<Node> roots = createTestNodes(); Flux.fromIterable(roots) .parallel(4) .runOn(Schedulers.parallel()) .flatMap(node -> Flux.just(node).expandDeep(expander)) .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i)) .sequential() .subscribe(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("finished");}我的测试数据:static final class Node { final String name; final List<Node> children; Node(String name, Node... nodes) { this.name = name; this.children = new ArrayList<>(); children.addAll(Arrays.asList(nodes)); } @Override public String toString() { return name; }}static List<Node> createTestNodes() { return new Node("root", new Node("1", new Node("11") ), new Node("2", new Node("21"), new Node("22", new Node("221") ) ), new Node("3", new Node("31"), new Node("32", new Node("321") ), new Node("33", new Node("331"), new Node("332", new Node("3321") ) ) ), new Node("4", new Node("41"), new Node("42", new Node("421") ), new Node("43", new Node("431"), new Node("432", new Node("4321") ) ), new Node("44", new Node("441"), new Node("442", new Node("4421") ), new Node("443", new Node("4431"), new Node("4432") ) ) ) ).children;}结果:Time: 1549296674522 thread: parallel-4 value: 4Time: 1549296674523 thread: parallel-4 value: 41Time: 1549296674523 thread: parallel-2 value: 2Time: 1549296674523 thread: parallel-2 value: 21Time: 1549296674523 thread: parallel-3 value: 3Time: 1549296674523 thread: parallel-3 value: 31Time: 1549296674523 thread: parallel-1 value: 1Time: 1549296674523 thread: parallel-1 value: 11Time: 1549296674525 thread: parallel-2 value: 22Time: 1549296674525 thread: parallel-2 value: 221Time: 1549296674526 thread: parallel-3 value: 32Time: 1549296674526 thread: parallel-3 value: 321Time: 1549296674526 thread: parallel-3 value: 33Time: 1549296674526 thread: parallel-3 value: 331Time: 1549296674526 thread: parallel-3 value: 332Time: 1549296674526 thread: parallel-3 value: 3321Time: 1549296674526 thread: parallel-4 value: 42Time: 1549296674526 thread: parallel-4 value: 421Time: 1549296674526 thread: parallel-4 value: 43Time: 1549296674526 thread: parallel-4 value: 431Time: 1549296674526 thread: parallel-4 value: 432Time: 1549296674526 thread: parallel-4 value: 4321Time: 1549296674527 thread: parallel-4 value: 44Time: 1549296674527 thread: parallel-4 value: 441Time: 1549296674527 thread: parallel-4 value: 442Time: 1549296674527 thread: parallel-4 value: 4421Time: 1549296674528 thread: parallel-4 value: 443Time: 1549296674528 thread: parallel-4 value: 4431Time: 1549296674528 thread: parallel-4 value: 4432如您所见expander,在并行线程中工作。
ibeautiful
这是一个示例,基于YauhenBalykin给出的示例:public static void main(String[] args) { Function<Node, Flux<Node>> expander = node -> Flux.fromIterable(node.children) .subscribeOn(Schedulers.parallel()); List<Node> roots = createTestNodes(); Flux.fromIterable(roots) .expand(expander) .doOnNext(i -> System.out.println("Time: " + System.currentTimeMillis() + " thread: " + Thread.currentThread().getName() + " value: " + i)) .subscribe(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("finished");}测试数据:static final class Node { final String name; final List<Node> children; Node(String name, Node... nodes) { this.name = name; this.children = new ArrayList<>(); children.addAll(Arrays.asList(nodes)); } @Override public String toString() { return name; }}static List<Node> createTestNodes() { return new Node("root", new Node("1", new Node("11") ), new Node("2", new Node("21"), new Node("22", new Node("221") ) ), new Node("3", new Node("31"), new Node("32", new Node("321") ), new Node("33", new Node("331"), new Node("332", new Node("3321") ) ) ), new Node("4", new Node("41"), new Node("42", new Node("421") ), new Node("43", new Node("431"), new Node("432", new Node("4321") ) ), new Node("44", new Node("441"), new Node("442", new Node("4421") ), new Node("443", new Node("4431"), new Node("4432") ) ) ) ).children;}结果:Time: 1636182895717 thread: main value: 1Time: 1636182895754 thread: main value: 2Time: 1636182895754 thread: main value: 3Time: 1636182895754 thread: main value: 4Time: 1636182895761 thread: parallel-1 value: 11Time: 1636182895761 thread: parallel-2 value: 21Time: 1636182895761 thread: parallel-2 value: 22Time: 1636182895762 thread: parallel-3 value: 31Time: 1636182895762 thread: parallel-3 value: 32Time: 1636182895762 thread: parallel-3 value: 33Time: 1636182895762 thread: parallel-4 value: 41Time: 1636182895762 thread: parallel-4 value: 42Time: 1636182895762 thread: parallel-4 value: 43Time: 1636182895762 thread: parallel-4 value: 44Time: 1636182895764 thread: parallel-7 value: 221Time: 1636182895764 thread: parallel-9 value: 321Time: 1636182895764 thread: parallel-10 value: 331Time: 1636182895765 thread: parallel-10 value: 332Time: 1636182895765 thread: parallel-12 value: 421Time: 1636182895765 thread: parallel-1 value: 431Time: 1636182895765 thread: parallel-1 value: 432Time: 1636182895766 thread: parallel-2 value: 441Time: 1636182895766 thread: parallel-2 value: 442Time: 1636182895766 thread: parallel-2 value: 443Time: 1636182895766 thread: parallel-6 value: 3321Time: 1636182895767 thread: parallel-9 value: 4321Time: 1636182895767 thread: parallel-11 value: 4421Time: 1636182895767 thread: parallel-12 value: 4431Time: 1636182895767 thread: parallel-12 value: 4432finished