这是我目前的设置:
queue1 和 queue2 与流向 channel1 的集成流一起标记:
@Bean
public IntegrationFlow q1f() {
return IntegrationFlows
.from(queue1InboundAdapter())
...
.channel(amqpInputChannel())
.get();
}
@Bean
public IntegrationFlow q2f() {
return IntegrationFlows
.from(queue2InboundAdapter())
...
.channel(amqpInputChannel())
.get();
}
然后,一切都被聚合,然后在聚合消息被rabbitmq确认后确认:
@Bean
public IntegrationFlow aggregatingFlow() {
return IntegrationFlows
.from(amqpInputChannel())
.aggregate(...
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10))
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
)
.handle(amqpOutboundEndpoint())
.get();
}
@Bean
public AmqpOutboundEndpoint amqpOutboundEndpoint() {
AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
outboundEndpoint.setConfirmAckChannel(manualAckChannel());
outboundEndpoint.setConfirmCorrelationExpressionString("#root");
outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
return outboundEndpoint;
}
ackTemplate()用 cf 设置springFactory.setPublisherConfirms(true);。
我看到的问题是,每 10 天一次,有一些消息卡unacknowledged在 rabbitmq 中的状态。
我的猜测是,消息的发布以某种方式等待兔子做,PUBLISHER CONFIRMS但它永远不会得到它并超时?在这种情况下,我从不 ACK 中的消息queue1。这可能吗?
因此,只需再完成一次工作流程:
[两个队列->直接通道->聚合器(保留通道和标记值)->发布到兔子->兔子通过发布者确认返回ACK->spring确认通道上的所有消息+它为聚合消息保存在内存中的值]
FFIVE
紫衣仙女
相关分类