当我使用新的组 ID 注册消费者时,前 N 次轮询调用不返回任何内容。
我想测试当我调用服务时,会发布一个 Kafka 事件。问题是每当我更改 groupId 时,前 N 个民意调查都不会返回任何内容。我了解 Kafka 在轮询时首先注册消费者,但我发现注册消费者所需的轮询次数(时间)过于随机。
消费者配置:
Properties props = new Properties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
脚步:
在每次测试之前,我consumer.poll(Duration.ofSeconds(5))
只是为了确保消费者已注册并设置了偏移量。
我调用服务并断言响应。如果我使用 UI 检查 Kafka,则会发布事件。
我打电话consumer.poll(Duration.ofSeconds(5))
,希望能收到一些记录。这是失败的一步。
有没有办法确保第二次投票总是返回记录?我试图让第一次投票持续 1 分钟(我已经认为 5 秒对于等待每次测试来说太长了),它有时仍然有效,有时无效。
谢谢。
一只萌萌小番薯
相关分类