猿问

升级到 Spring Integration 5 后,使用消息不再起作用

我正在尝试将使用 Spring Integration 4.3 和 Spring Boot 1.6 的项目升级到 Spring Integration 5.1 和 Spring Boot 2.1。以前我有以下配置:


IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")

                    .id("myId")

                    .autoStartup(autoStartup)

                    .prefetchCount(10)

                    .concurrentConsumers(2)

                    .maxConcurrentConsumers(3)

                    .messageConverter(messageConverter()))

                    .aggregate(a -> a.correlationExpression("payload.entityId")

                                    .releaseExpression("size() eq iterator().next().payload.batchSize")

                                    .sendPartialResultOnExpiry(true)

                                    .groupTimeout(2000)

                                    .expireGroupsUponCompletion(true)

                                    .outputProcessor(myMessageGroupProcessor))

                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))

                    .get();

在升级过程中,我尝试按照此处的文档进行操作,因此将配置更改为:


@Configuration

@EnableAutoConfiguration

@EnableIntegration

public class SpringConfig {


    @Bean(name = "myFlowId")

    public IntegrationFlow myFlow(ConnectionFactory connectionFactory, ServiceActivatorBean serviceActivatorBean,

                                  @Value("${spring.integration.flow.auto-startup:true}") boolean autoStartup,

                                  MyMessageGroupProcessor myMessageGroupProcessor) {

    }

}

但是,当我发布消息时,集成流似乎没有接收/处理它们。我没有收到任何错误日志(或者即使我启用了调试日志记录也没有任何日志),而且我不太确定从哪里开始调试。我很肯定消息实际上已发布到 RabbitMQ,所以这不是问题。我会错过什么?


慕侠2389804
浏览 106回答 1
1回答

撒科打诨

我的问题实际上不是由于 Spring Integration,而是与 Spring AMQP 的变化有关。以前的“可声明”可以这样创建:@BeanList<Binding> myBinding() {&nbsp; &nbsp; return List.of(<binding1>, <binding2>, ..)}但在 Spring AMQP 2.1 中,这应该更改为:@BeanDeclarables myBinding() {&nbsp; &nbsp; return new Declarables(List.of(<binding1>, <binding2>, ..))}请参阅此处的文档。顺便说一句,我releaseExpression的也错了,应该是size() eq one.payload.batchSize。
随时随地看视频慕课网APP

相关分类

Java
我要回答