我有以下RabbitMQ配置
@Configuration
@IntegrationComponentScan
public class RabbitConfig {
@Autowired // TODO constructor!
private ConnectionFactory connectionFactory;
public RabbitConfig(
@Value("${article.inbound.queue}") String queueName,
@Value("${article.inbound.exchange}") String exchangeName,
@Value("${article.inbound.routingkey}") String routingKey) {
this.queueName = queueName;
this.exchangeName = exchangeName;
this.routingKey = routingKey;
}
@Bean
Exchange exchange() {
return ExchangeBuilder
.topicExchange(this.exchangeName)
.durable(true)
.build();
}
@Bean
Queue queue() {
return QueueBuilder.durable(queueName).build();
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(exchange())
.with(routingKey)
.noargs();
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleMessageListenerContainer articleListenerContainer(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setMessageConverter(jsonMessageConverter());
return container;
}
}
和简单的 Spring 集成流程
@Bean
IntegrationFlow fromMessageBroker(SimpleMessageListenerContainer messageListener) {
return IntegrationFlows.from(Amqp.inboundAdapter(messageListener))
.log()
.handle(message -> {
final MessageHeaders headers = message.getHeaders();
final Object assetId = headers.get("assetId");
log.info(assetId);
})
.get();
}
当我启动 Spring Boot 时,一切都很好,直到我发布消息以交换我定义的队列已绑定。然后所有处理进一步进行,在 SimpleMessageListenerContainer 崩溃之后。
正如你在日志中看到的那样,流到最后,然后这个容器崩溃了。不幸的是,异常跟踪中没有有用的信息。我想知道为什么会这样。我试图将此流中的这些消息重定向到 MessageChannel 然后处理它们,但没有帮助。
相关分类