我正在使用 Spring Kafka 2.2.7,我已经配置@EnableKafka并kafkaListenerContainerFactory使用它@KafkaListener来消费消息,一切都按预期工作。
我想添加一个RecordInterceptor来记录所有消耗的消息,但发现很难配置它。文档指出 RecordInterceptor 可以在容器上设置,但是我不确定如何获取容器的实例。
从2.2.7版本开始,可以向监听器容器添加RecordInterceptor;它将在调用侦听器之前调用,以允许检查或修改记录。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory());
factory.setConcurrency(consumerCount);
return factory;
}
我浏览了 Spring 文档,但没有找到解决方案,这似乎是一件简单的事情,但也许我错过了一些东西。
在这方面的任何帮助将不胜感激。
提前致谢。
莫回无
相关分类