Reactive Streams:如何按键等待所有发布者?

假设我有 3 个发布者和 1 个处理者。发布者以 形式发布项目{key: <integer>, value: <object>, publisher_id: <string>}

发布者进行 IO 操作,因此:

  • 一方面,我希望出版商N在特定时刻处理(大致)项目。

  • 另一方面,我希望消费者将项目合并{key: <integer>, values: <list>}到一个记录中(即)

我实际上已经实现了一个FluxProcessor具有内部存储空间 ( ConcurrentHashMap) 来保存所有项目的。request()只要未达到 CAPACITY,它就会手动添加新项目。

我想知道是否有内置功能可以使用 RxJava(2)/ Spring Reactor API 来做到这一点?


慕雪6442864
浏览 101回答 1
1回答

慕姐8265434

在 RxJava 2 中使用 merge、rebatchRequests 和 toMultimap:Flowable<KeyValuePublisher> source1 = ...Flowable<KeyValuePublisher> source2 = ...Flowable<KeyValuePublisher> source3 = ...Flowable.merge(&nbsp; &nbsp; source1.rebatchRequests(N),&nbsp; &nbsp; source2.rebatchRequests(N),&nbsp; &nbsp; source3.rebatchRequests(N)).toMultimap(kvp -> kvp.key, kvp -> kvp.value)subscribe(map -> System.out.println(map));
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java