Spring 5 反应式 websockets:客户端没有从热流中接收到相同的数据

我的WebSocketHandler实现中有这个:


@Override

public Mono<Void> handle(WebSocketSession session) {


    return session.send(

       session.receive()

              .flatMap(webSocketMessage -> {

                  int id = Integer.parseInt(webSocketMessage.getPayloadAsText());


                  Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);

                  var publisher = flux

                      .<String>handle((o, sink) -> {

                         try {

                            sink.next(objectMapper.writeValueAsString(o));

                         } catch (JsonProcessingException e) {

                            e.printStackTrace();                               

                         }

                      })

                      .map(session::textMessage);


                  return publisher;

              })

    );

}

当前Flux<EfficiencyData>生成的用于在服务中进行测试如下:


public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) {

    return Flux.interval(Duration.ofSeconds(1))

               .map(aLong -> {

                   longAdder.increment();

                   return new EfficiencyData(new MachineSpeed(

                           RotationSpeed.ofRpm(longAdder.intValue()),

                           RotationSpeed.ofRpm(0),

                           RotationSpeed.ofRpm(400)));

               }).publish().autoConnect();

}

我用publish().autoConnect()它来使它成为热流。我创建了一个单元测试,它启动了 2 个线程,这些线程在返回的内容上执行此操作Flux:


flux.log().handle((s, sink) -> {

            LOGGER.info("{}", s.getMachineSpeed().getCurrent());

        }).subscribe();

在这种情况下,我看到两个线程每秒都打印出相同的值。


但是,当我打开 2 个浏览器选项卡时,我在两个网页中看不到相同的值。连接的 websocket 客户端越多,值之间的差异就越大(因此原始 Flux 中的每个值似乎都发送到不同的客户端,而不是发送到所有客户端)。


qq_花开花谢_0
浏览 69回答 1
1回答

白板的微信

问题是对于每个连接的 websocket 客户端,我都会调用该service.subscribeToEfficiencyData(id)方法,每次调用它都会返回一个新的Flux。因此,当然,这些独立的 Flux 不会在不同的 websocket 客户端之间共享。为了解决这个问题,我Flux在构造函数或PostConstruct我的服务的方法中创建实例,以便subscribeToEfficiencyData每次都返回相同的 Flux 实例。请注意,.publish().autoConnect()在 Flux 上仍然很重要,因为没有那个 websocket 客户端将再次看到不同的值!
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java