我们有一个多组件应用程序,它在组件之间提供反应式流API。一些组件使用Akka流实现,其他组件使用例如反应器。
在一个组件中,我们注意到 Streams 不处理任何消息,尽管提供的发布者提供记录。我把问题归结为以下情况:
Publisher<String> stringPublisher = Source .from(Lists.newArrayList("Hello", "World", "!")) .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer); Source<String, NotUsed> allMessages = Source .fromPublisher(stringPublisher) .toMat(BroadcastHub.of(String.class, 256), Keep.right()) .run(materializer); allMessages .runForeach(System.out::println, materializer) .toCompletableFuture() .get();
一个组件提供发布服务器(它需要是发布者,因为 API 使用反应式流 API,而不是 Akka 流 API)。此发布服务器是从另一个 Akka 流源创建的,并使用 转换为发布服务器。Sink.asPublisher
现在,当我们使用 BroadcastHub 从发布者开始实现流时,根本不会处理任何记录。
我对反应堆出版商也试过同样的方法:
Publisher<String> stringPublisher = Flux.fromIterable(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
这按预期工作。不幸的是,我不能排除另一个组件从 Akka 流源创建其发布服务器的情况。
有没有人知道出了什么问题?
浮云间
相关分类