我的WebSocketHandler实现中有这个:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(
session.receive()
.flatMap(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux
.<String>handle((o, sink) -> {
try {
sink.next(objectMapper.writeValueAsString(o));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
})
.map(session::textMessage);
return publisher;
})
);
}
当前Flux<EfficiencyData>生成的用于在服务中进行测试如下:
public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) {
return Flux.interval(Duration.ofSeconds(1))
.map(aLong -> {
longAdder.increment();
return new EfficiencyData(new MachineSpeed(
RotationSpeed.ofRpm(longAdder.intValue()),
RotationSpeed.ofRpm(0),
RotationSpeed.ofRpm(400)));
}).publish().autoConnect();
}
我用publish().autoConnect()它来使它成为热流。我创建了一个单元测试,它启动了 2 个线程,这些线程在返回的内容上执行此操作Flux:
flux.log().handle((s, sink) -> {
LOGGER.info("{}", s.getMachineSpeed().getCurrent());
}).subscribe();
在这种情况下,我看到两个线程每秒都打印出相同的值。
但是,当我打开 2 个浏览器选项卡时,我在两个网页中看不到相同的值。连接的 websocket 客户端越多,值之间的差异就越大(因此原始 Flux 中的每个值似乎都发送到不同的客户端,而不是发送到所有客户端)。
白板的微信
相关分类