何时使用 ConcurrentKafkaListenerContainerFactory?

我是 kafka 的新手,我浏览了文档,但我什么都不懂。有人可以解释何时使用该ConcurrentKafkaListenerContainerFactory课程吗?我已经使用了该Kafkaconsumer课程,但我看到ConcurrentKafkaListenerContainerFactory在我当前的项目中使用了该课程。请解释它的用途。



临摹微笑
浏览 481回答 2
2回答

翻阅古今

Kafka 消费者不是线程安全的。所有网络 I/O 都发生在进行调用的应用程序的线程中。确保多线程访问正确同步是用户的责任。非同步访问将导致ConcurrentModificationException.如果消费者被分配了多个分区来获取数据,它将尝试同时从所有分区中消费,从而有效地为这些分区提供相同的消费优先级。但是,在某些情况下,消费者可能希望首先专注于从分配的分区的某个子集全速获取,并且仅在这些分区几乎没有或没有数据要消耗时才开始获取其他分区。春卡夫卡ConcurrentKafkaListenerContainerFactory用于为带注释的方法创建容器@KafkaListenerMessageListenerContainer春天卡夫卡有两个KafkaMessageListenerContainerConcurrentMessageListenerContainerKafkaMessageListenerContainer接收来自单个线程上所有主题或分区的所有消息。ConcurrentMessageListenerContainer委托给一个或多个实例KafkaMessageListenerContainer以提供多线程消费。使用 ConcurrentMessageListenerContainer@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; kafkaListenerContainerFactory() {&nbsp; &nbsp; ConcurrentKafkaListenerContainerFactory<Integer, String> factory =&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new ConcurrentKafkaListenerContainerFactory<>();&nbsp; &nbsp; factory.setConsumerFactory(consumerFactory());&nbsp; &nbsp; factory.setConcurrency(3);&nbsp; &nbsp; factory.getContainerProperties().setPollTimeout(3000);&nbsp; &nbsp; return factory;&nbsp; }它具有并发属性。例如, container.setConcurrency(3) 创建了三个KafkaMessageListenerContainer实例。如果TopicPartition提供了6个实例,并发为3;每个容器有两个分区。对于五个 TopicPartition 实例,两个容器获得两个分区,第三个获得一个。如果并发大于 TopicPartition 的数量,则将并发调低,使每个容器获得一个分区。

梵蒂冈之花

Kafka Consumer API 不是线程安全的。ConcurrentKafkaListenerContainerFactory api 提供了使用 Kafka Consumer API 的并发方式以及设置其他 kafka 消费者属性。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java