Kafka 2 消费者工厂监听器不是一直连接

我们在我们的项目中使用 Spring Kafka 2.1.4.RELEASE 版本,我们有以下配置:


@EnableKafka

public class KafkaConfig {


    @Value("${spring.kafka.bootstrap-servers}")

    private String bootstrapServers;


    @Configuration

    class ProducerConfig {

        @Bean

        public Map<String, Object> producerConfigs() {

            Map<String, Object> props = new HashMap<>();

            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ASerializer.class);

            return props;

        }


        @Bean

        public ProducerFactory<String, A> producerFactory() {

            return new DefaultKafkaProducerFactory<>(producerConfigs());

        }


        @Bean

        public KafkaTemplate<String, A> kafkaTemplate() {

            return new KafkaTemplate<>(producerFactory());

        }

    }


    @Configuration

    class ConsumerConfig {

        @Value("${spring.kafka.consumer.group-id}")

        private String groupId;

        @Value("${spring.kafka.consumer.auto-offset-reset}")

        private String autoOffsetReset;

        @Value("${spring.kafka.consumer.enable-auto-commit}")

        private boolean enableAutoCommit;

        @Value("${spring.kafka.consumer.max-poll-records}")

        private Integer maxPollRecords;


        @Bean

        public Map<String, Object> firstConsumerConfig() {

            Map<String, Object> props = getCommonConsumerConfig();

            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ADeserializer.class);

            return props;

        }



所以我们在启动这个应用程序时注意到它并不是一直连接到这两个主题。有时它仅连接到第二个主题或仅连接到第一个主题,并且可能连接到第一个和第二个主题(这是正确的)。那么你能帮助理解这里配置错误吗?


胡子哥哥
浏览 170回答 2
2回答

繁花如伊

通常最佳做法是将每个侦听器放在不同的位置group.id(您可以使用覆盖消费者工厂的groupId属性@KafkaListener)。否则,当第二个开始时,第一个会导致重新平衡。当前的 2.1.x 版本是 2.1.10。

互换的青春

好的,经过更多调查后,我能够确定我这边发生了什么样的问题。所以基本上我们有一个包含多个主题的消费者组。因此,在我的情况下,我们为每个主题设置了 0 个分区(据我所知,没有分区,我们使用主题的 1 个队列进行操作)。因此,当我连接到该 kafka 实例时 - 所有消费者都连接到这些主题,但是当有人也连接到该主题(可能是我的同事)时,正在发生重新平衡,他开始听这些主题之一而不是我(由于事实上每个分区只能有一个使用者)。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java