我需要向 KafkaListenerEndpointRegistry 添加一些逻辑 - 我想为每个主题注册额外的侦听器(我想创建具有不同轮询时间的重试主题消费者链),我使用 @Listener 注释创建。为此,我想尝试覆盖 registerListenerContainer 方法并在那里实现逻辑。
我做的第一步是添加与 KafkaBootstrapConfiguration 相同的默认配置。但在那之后,我所有的测试都失败了,出于某种原因,我的听众没有消费任何东西。如果我不添加豆,一切都会正常。
@Configuration
@EnableKafka
public class CustomKafkaBootstrapConfiguration {
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
return new KafkaListenerEndpointRegistry(){
@Override
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
//i need to add logic here
super.registerListenerContainer(endpoint, factory);
}
};
}
}
慕田峪9158850
相关分类