我正在尝试配置以下流程:当消息到达 Rabbit 队列时尝试获取锁,查询远程文件服务器以获取某些文件,并为找到的每个文件向另一个队列发送新消息,并在发送所有文件后释放锁文件。
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.filter(m -> lockService.acquire())
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.aggregate()
.handle(m -> {
log.info("Releasing lock");
lock.release();
})
.get();
问题是流在第.handle一种方法之后停止(老实说,正如预期的那样),我无法弄清楚如何配置它来做我想做的事?我尝试使用.wireTapand.publishSubscribeChannel但这使得 2 个流相互不依赖,并且我的锁在文件实际发送之前被释放。
如果有人可以帮助我解释如何使用 DSL 修复它会很棒,因为我正在动态创建这些流......
米脂
相关分类