我正在连接一个 ANT+ USB 记忆棒,并用项目反应器替换我自己的天真“MessageBus”,因为它看起来非常合适。
USB 接口本质上是异步的(单独的输入/输出管道),我想以阻塞方式处理一组请求/响应消息。
我已经设置了一个单独的线程,它不断地从 usb in-pipe 中读取消息并将它们写入一个接收器,该接收器提供一个共享的 Flux,任何人都可以订阅。这似乎工作正常。
目前我向 usb 管道发送一条消息,然后在共享通量上使用 .filter() 和 .blockFirst() :(人为代码)
/**
* Puts a message on the Usb out Pipe and waits for the relevant asynchronous response on the {@link AntUsbReader#antMessages()} {@link Flux}
*
* @param message Message to send.
* @return related response message.
*/
public AntMessage sendBlocking(AntBlockingMessage message) {
send(message); // in essence, calls usbOutPipe.syncSubmit(message.getBytes()), returns void
// bug: ant dongle can reply to message even before following Flux is activated, meaning .blockFirst() goes in timeout.
return this.antUsbMessageReader.antMessages() // .antMessages() is an (infinite) Flux<AntMessage>
.filter(antMessage -> antMessage.getMessageId() == message.getMessageId())
.blockFirst(Duration.ofSeconds(10));
}
问题是 usb 记忆棒甚至在 flux 被激活之前就可以响应,从而导致 TimeoutException。
添加一个Thread.sleep(10)到 usb 读卡器“解决”了这个问题,但是实现这种阻塞行为的正确方法是什么?
设置订阅(使用 .take(1)),发送消息然后阻塞订阅?
设置一个发送和等待正确响应都完成的 Flux?
我想不通...
至尊宝的传说
慕桂英546537
相关分类