Spring Cloud 流/Kafka 异常

我对使用 spring 云流和 kafka 的服务有问题。该服务一直工作正常,但昨天开始报告启动时出现一系列异常:

一段时间后,我们会看到这样的异常:

Caused by: org.springframework.messaging.core.DestinationResolutionException: failed to look up MessageChannel with name '946859a6-bc27-466d-91ba-3da93af50ac9:1' in the BeanFactory.; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named '946859a6-bc27-466d-91ba-3da93af50ac9:1' available

与 kafka 的连接配置了一个属性: spring.kafka.bootstrap-server = kafka.kafka:9092

并且主题配置了 Spring Cloud 流属性: spring.cloud.stream.bindings.[topic-name].destination = blah

与 kafka 的交互通过 spring 与如下代码集成进行:

@MessagingGatewaypublic interface StreamGateway {
    @Gateway(requestChannel = KafkaConfig.ENRICH_PAYMENT, replyChannel = ChannelNames.PAYMENT_REPLY, replyTimeout = 10000)
    String processPayment(String payload);}

//不同的类:

private final StreamGateway gateway;...gateway.processPayment(message)

这是在 azure kubernetes 部署上运行,而 kafka 与 spring boot 服务位于一个单独的 pod 中。

提前致谢。

更新: 问题再次发生,一些进一步的调查强调了一些事情

  • 因为我们使用 spring 集成@MessagingGateway@Gateway创建与 Kafka 的同步交互,所以没有正常的主题StreamListener或订阅者

  • 当主题存在滞后时会出现问题,即主题中存在超出主题偏移量的消息。

  • 缺少正常StreamListener意味着无法处理滞后消息。只有当 MessageGateway 建立连接时,才有可能从主题中读取消息。

  • 解决问题的一种方法是读取所有“滞后”消息,使滞后为 0。然后服务将正常启动,但是如果我手动将消息发布到主题(与 MessageGateway 交互),则错误再次发生。

  • 第二个部分解决方案(我还没有完全理解)是向@DependsOnMessageGateway添加一个注释,表明它需要一个使用@InputSubscribableChannel 对象单独创建的 bean 。这意味着 SubscribableChannel 必须在 MessageGateway 之前创建,因此创建了一个订阅者,但是仍然没有 StreamListener,因此当从主题中拉出滞后消息时仍然会抛出异常,无处可去 🤨


慕尼黑8549860
浏览 237回答 2
2回答

拉风的咖菲猫

在查看调试日志时,我注意到该服务正确连接到其他主题,但付款回复主题存在问题。我尝试删除此主题并重新启动服务。这解决了问题。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java