Consumer.endOffsets 在 Kafka 中是如何工作的?

假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出滞后、提交偏移和结束偏移。类似于 Kafka 控制台消费者组脚本的工作方式,但它适用于所有组。


就像是


单个消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)


Consumer consumer;


static {

  consumer = createConsumer();

}


run() { 

  List<String> groupIds = getConsumerGroups();

  for(String groupId: groupIds) {

       List<TopicParition> topicParitions =  getTopicParitions(groupId);

       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)

   }

}

多个消费者 - 工作


run() { 

   List<String> groupIds = getConsumerGroups();

   for(String groupId: groupIds) {

        List<TopicParition> topicParitions =  getTopicParitions(groupId);

        Consumer consumer = createConsumer();

        consumer.endOffsets(topicParitions); This works!!!

   }

 }

版本:Kafka-Client 2.0.0


我是否错误地使用了消费者 API?理想情况下,我想使用单个消费者。


如果您需要更多详细信息,请告诉我。


繁花如伊
浏览 500回答 2
2回答

HUH函数

我想你快到了。首先收集您感兴趣的所有主题分区,然后发出consumer.endOffsets命令。请记住,我还没有尝试运行它,但是这样的事情应该可以工作:run() {&nbsp;&nbsp; &nbsp;Consumer consumer = createConsumer();&nbsp; &nbsp;List<String> groupIds = getConsumerGroups();&nbsp; &nbsp;List<TopicPartition> topicPartitions = new ArrayList<>();&nbsp; &nbsp;for (String groupId: groupIds) {&nbsp; &nbsp; &nbsp; &nbsp; topicPartitions.addAll(getTopicPartitions(groupId));&nbsp; &nbsp;}&nbsp; &nbsp;consumer.endOffsets(topicPartitions);&nbsp;}

繁星淼淼

这是一个Fetcher.fetchOffsetsByTimes()特别在groupListOffsetRequests方法内部的错误,其中逻辑没有添加用于重试的分区,其中用于请求分区偏移量的领导者未知或不可用。当您在所有消费者组分区中使用单个消费者时,这一点更加明显,其中一些组在我们请求时已经拥有主题分区领导信息,endoffsets而对于没有领导信息的主题分区,未知或不可用的主题分区由于错误而被忽略.后来,我意识到从每个消费者组中提取主题分区并不是一个好主意,而是进行更改以从中读取主题分区AdminClient.listTopics & AdminClient.describeTopics并一次性传递给Consumer.endOffsets.尽管这完全不能解决问题,因为主题/分区在多次运行之间可能仍然不可用或未知。可以找到更多信息 -&nbsp;KAFKA-7044&&nbsp;pull request。这已被修复并计划在 2.1.0 版本中发布。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java