Kafka consumer.poll 不返回任何记录

当我使用新的组 ID 注册消费者时,前 N 次轮询调用不返回任何内容。


我想测试当我调用服务时,会发布一个 Kafka 事件。问题是每当我更改 groupId 时,前 N 个民意调查都不会返回任何内容。我了解 Kafka 在轮询时首先注册消费者,但我发现注册消费者所需的轮询次数(时间)过于随机。


消费者配置:


Properties props = new Properties();

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);

props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);

// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);


KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList(TOPIC_NAME));

脚步:

  1. 在每次测试之前,我consumer.poll(Duration.ofSeconds(5))只是为了确保消费者已注册并设置了偏移量。

  2. 我调用服务并断言响应。如果我使用 UI 检查 Kafka,则会发布事件。

  3. 我打电话consumer.poll(Duration.ofSeconds(5)),希望能收到一些记录。这是失败的一步

有没有办法确保第二次投票总是返回记录?我试图让第一次投票持续 1 分钟(我已经认为 5 秒对于等待每次测试来说太长了),它有时仍然有效,有时无效。

谢谢。


蝴蝶刀刀
浏览 422回答 1
1回答

一只萌萌小番薯

它不适用于您的“新 groupId”的原因是您处于“最新”模式。默认值为“最新”,您需要处于“最早”模式或使用您的“新 groupId”首次轮询或为此主题的此“新 groupId”提交偏移量。您需要将“groupId”注册到主题,而不是消费者。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java