如何将 RecordInterceptor 设置为 ConcurrentKafkaListener

我正在使用 Spring Kafka 2.2.7,我已经配置@EnableKafka并kafkaListenerContainerFactory使用它@KafkaListener来消费消息,一切都按预期工作。


我想添加一个RecordInterceptor来记录所有消耗的消息,但发现很难配置它。文档指出 RecordInterceptor 可以在容器上设置,但是我不确定如何获取容器的实例。


从2.2.7版本开始,可以向监听器容器添加RecordInterceptor;它将在调用侦听器之前调用,以允许检查或修改记录。


    @Bean

    public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(createConsumerFactory());

        factory.setConcurrency(consumerCount);

        return factory;

    }

我浏览了 Spring 文档,但没有找到解决方案,这似乎是一件简单的事情,但也许我错过了一些东西。


在这方面的任何帮助将不胜感激。


提前致谢。


慕码人8056858
浏览 99回答 1
1回答

莫回无

有一个方法setRecordInterceptor因为2.2.7factory.setRecordInterceptor(new RecordInterceptor);另一条信息RecordInterceptor不适用于批处理侦听器从2.2.7版本开始,可以向监听器容器添加RecordInterceptor;它将在调用侦听器之前调用,以允许检查或修改记录。如果拦截器返回 null,则不会调用侦听器。当侦听器是批处理侦听器时,不会调用拦截器。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java