猿问

如何在 KafkaBootstrapConfiguration 中覆盖 KafkaListener

我需要向 KafkaListenerEndpointRegistry 添加一些逻辑 - 我想为每个主题注册额外的侦听器(我想创建具有不同轮询时间的重试主题消费者链),我使用 @Listener 注释创建。为此,我想尝试覆盖 registerListenerContainer 方法并在那里实现逻辑。


我做的第一步是添加与 KafkaBootstrapConfiguration 相同的默认配置。但在那之后,我所有的测试都失败了,出于某种原因,我的听众没有消费任何东西。如果我不添加豆,一切都会正常。


@Configuration

@EnableKafka

public class CustomKafkaBootstrapConfiguration {


  @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)

  public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {

    return new KafkaListenerEndpointRegistry(){

        @Override

        public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {

            //i need to add logic here

            super.registerListenerContainer(endpoint, factory);

        }

    };

  }

}


偶然的你
浏览 140回答 1
1回答

慕田峪9158850

我刚刚复制了您的覆盖,一切都按预期进行。@SpringBootApplicationpublic class So57674940Application {&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; SpringApplication.run(new Class<?>[] { So57674940Application.class, So57674940ApplicationConfig.class }, args);&nbsp; &nbsp; }&nbsp; &nbsp; @KafkaListener(id = "so57674940", topics = "so57674940")&nbsp; &nbsp; public void listen(String in) {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(in);&nbsp; &nbsp; }}@Configuration@EnableKafkaclass So57674940ApplicationConfig {&nbsp; &nbsp; @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)&nbsp; &nbsp; public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {&nbsp; &nbsp; &nbsp; &nbsp; return new KafkaListenerEndpointRegistry() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void registerListenerContainer(KafkaListenerEndpoint endpoint,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; KafkaListenerContainerFactory<?> factory) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // i need to add logic here&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("in custom registry");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; super.registerListenerContainer(endpoint, factory);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; }}和in custom registry2019-08-27 11:20:36.251&nbsp; INFO 33460 --- [o57674940-0-C-1] o.s.k.l.KafkaMessageListenerContainer&nbsp; &nbsp; : partitions assigned: [so57674940-0]
随时随地看视频慕课网APP

相关分类

Java
我要回答