流后的Spring DSL句柄?

我正在尝试配置以下流程:当消息到达 Rabbit 队列时尝试获取锁,查询远程文件服务器以获取某些文件,并为找到的每个文件向另一个队列发送新消息,并在发送所有文件后释放锁文件。


IntegrationFlows.from(Amqp.inboundGateway(container)

    .messageConverter(messageConverter)

)

    .filter(m -> lockService.acquire())

    .transform(m -> remoteFileTemplate.list(inputDirectory))

    .split()

    .handle(Amqp.outboundAdapter(amqpTemplate)

        .exchangeName("")

        .routingKey(routingKey)

    .aggregate()

    .handle(m -> {

      log.info("Releasing lock");

      lock.release();

    })

    .get();

问题是流在第.handle一种方法之后停止(老实说,正如预期的那样),我无法弄清楚如何配置它来做我想做的事?我尝试使用.wireTapand.publishSubscribeChannel但这使得 2 个流相互不依赖,并且我的锁在文件实际发送之前被释放。


如果有人可以帮助我解释如何使用 DSL 修复它会很棒,因为我正在动态创建这些流......


翻过高山走不出你
浏览 121回答 1
1回答

米脂

拆分后的 pub/sub,在一个子流上使用 AMQP 处理程序,而在另一个子流上使用聚合器应该可以正常工作。每个都将在同一个线程上连续调用,最后一条消息导致从聚合器释放,再次在同一个线程上。话虽如此,您将需要在入站网关上进行一些 errorChannel 处理,以在发生错误时释放锁。编辑一个不太复杂的解决方案是ChannelInterceptor在转换之前而不是过滤器之前在通道上进行自定义,以锁定preSend()并释放它afterSendCompleted()(成功和失败都调用它)。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java