如何在项目反应器中设置阻塞异步请求/响应?

我正在连接一个 ANT+ USB 记忆棒,并用项目反应器替换我自己的天真“MessageBus”,因为它看起来非常合适。

USB 接口本质上是异步的(单独的输入/输出管道),我想以阻塞方式处理一组请求/响应消息。


我已经设置了一个单独的线程,它不断地从 usb in-pipe 中读取消息并将它们写入一个接收器,该接收器提供一个共享的 Flux,任何人都可以订阅。这似乎工作正常。


目前我向 usb 管道发送一条消息,然后在共享通量上使用 .filter() 和 .blockFirst() :(人为代码)


    /**

     * Puts a message on the Usb  out Pipe and waits for the relevant asynchronous response on the {@link AntUsbReader#antMessages()} {@link Flux}

     *

     * @param message Message to send.

     * @return related response message.

     */


    public AntMessage sendBlocking(AntBlockingMessage message) {

        send(message); // in essence, calls usbOutPipe.syncSubmit(message.getBytes()), returns void

        // bug: ant dongle can reply to message even before following Flux is activated, meaning .blockFirst() goes in timeout.

        return this.antUsbMessageReader.antMessages() // .antMessages() is an (infinite)  Flux<AntMessage>

                .filter(antMessage -> antMessage.getMessageId() == message.getMessageId())

                .blockFirst(Duration.ofSeconds(10));

    }

问题是 usb 记忆棒甚至在 flux 被激活之前就可以响应,从而导致 TimeoutException。

添加一个Thread.sleep(10)到 usb 读卡器“解决”了这个问题,但是实现这种阻塞行为的正确方法是什么?


设置订阅(使用 .take(1)),发送消息然后阻塞订阅?

设置一个发送和等待正确响应都完成的 Flux?

我想不通...


冉冉说
浏览 145回答 2
2回答

至尊宝的传说

我找到了一个可行的解决方案,但我不确定它是否是最好的:我设置了一个用于发送异步消息的 Mono,并将其与一个过滤匹配消息的 Flux 合并。看到 Mono 从不发出值,我知道合并的第一个对象是来自我的 Flux 的响应消息,所以我可以将它转换为正确的类型。这仍然感觉有点脏,但话又说回来,尝试使用用于异步工作的框架获得阻塞行为总是感觉有点脏......&nbsp; &nbsp; public AntMessage sendBlocking(AntBlockingMessage requestMessage) {&nbsp; &nbsp; &nbsp; &nbsp; Flux<AntMessage> response = this.antUsbReader.antMessages()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .take(1);&nbsp; &nbsp; &nbsp; &nbsp; Mono<Void> messageSender = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage));&nbsp; &nbsp; &nbsp; &nbsp; return (AntMessage) Flux.merge(response, messageSender).blockFirst(Duration.ofSeconds(1));&nbsp; &nbsp; }&nbsp; &nbsp; private boolean isMatchingResponse(AntBlockingMessage message, AntMessage response) {&nbsp; &nbsp; &nbsp; &nbsp; if (message instanceof RequestMessage) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return response.getMessageId() == ((RequestMessage) message).getMsgIdRequested();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return response.getMessageId() == message.getMessageId();&nbsp; &nbsp; }

慕桂英546537

查看您的代码后,我会在球场上提出一些建议。我是用手机写的,所以还没有测试过。但是我们先写,然后阻塞 1 秒,然后我们返回过滤响应的提取。Flux<AntMessage>&nbsp;response&nbsp;=&nbsp;Mono.fromRunnable(()&nbsp;->&nbsp;this.antUsbWriter.write(requestMessage)) &nbsp;&nbsp;&nbsp;&nbsp;.block(Duration.ofSeconds(1)) &nbsp;&nbsp;&nbsp;&nbsp;.thenReturn(this.antUsbReader.antMessages() &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.filter(responseMessage&nbsp;->&nbsp;isMatchingResponse(requestMessage,&nbsp;responseMessage)) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.take(1));
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java