我对使用 spring 云流和 kafka 的服务有问题。该服务一直工作正常,但昨天开始报告启动时出现一系列异常:
一段时间后,我们会看到这样的异常:
Caused by: org.springframework.messaging.core.DestinationResolutionException: failed to look up MessageChannel with name '946859a6-bc27-466d-91ba-3da93af50ac9:1' in the BeanFactory.; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named '946859a6-bc27-466d-91ba-3da93af50ac9:1' available
与 kafka 的连接配置了一个属性: spring.kafka.bootstrap-server = kafka.kafka:9092
并且主题配置了 Spring Cloud 流属性: spring.cloud.stream.bindings.[topic-name].destination = blah
与 kafka 的交互通过 spring 与如下代码集成进行:
@MessagingGatewaypublic interface StreamGateway { @Gateway(requestChannel = KafkaConfig.ENRICH_PAYMENT, replyChannel = ChannelNames.PAYMENT_REPLY, replyTimeout = 10000) String processPayment(String payload);}
//不同的类:
private final StreamGateway gateway;...gateway.processPayment(message)
这是在 azure kubernetes 部署上运行,而 kafka 与 spring boot 服务位于一个单独的 pod 中。
提前致谢。
更新: 问题再次发生,一些进一步的调查强调了一些事情
因为我们使用 spring 集成@MessagingGateway
并@Gateway
创建与 Kafka 的同步交互,所以没有正常的主题StreamListener
或订阅者
当主题存在滞后时会出现问题,即主题中存在超出主题偏移量的消息。
缺少正常StreamListener
意味着无法处理滞后消息。只有当 MessageGateway 建立连接时,才有可能从主题中读取消息。
解决问题的一种方法是读取所有“滞后”消息,使滞后为 0。然后服务将正常启动,但是如果我手动将消息发布到主题(与 MessageGateway 交互),则错误再次发生。
第二个部分解决方案(我还没有完全理解)是向@DependsOn
MessageGateway添加一个注释,表明它需要一个使用@Input
SubscribableChannel 对象单独创建的 bean 。这意味着 SubscribableChannel 必须在 MessageGateway 之前创建,因此创建了一个订阅者,但是仍然没有 StreamListener,因此当从主题中拉出滞后消息时仍然会抛出异常,无处可去 🤨
拉风的咖菲猫
相关分类