猿问

如何为我的消费者获取所有分区的当前偏移量?

我正在尝试获取每个可用分区的当前偏移量。根据文档,consumer.position应该可以解决问题,所以我这样尝试:


consumer = Consumer({

    'bootstrap.servers': config.BOOTSTRAP_SERVERS,

    'group.id': config.CONSUMER_GROUP,

    'enable.auto.commit': False,

})


# get all topics

topics = consumer.list_topics()


# get all partitions

partitions = []

for name, meta  in topics.topics.items():

    for partition_id in meta.partitions.keys():

        part = TopicPartition(name, partition_id)

        partitions.append(part)


# get all offsets

x = consumer.position(partitions)

但是,结果分区中的所有偏移量x仍然是-1001.


如果我使用镜头或其他工具进行检查,我可以看到这个结果不正确,我正在取消的消费者组已经消费了消息并将它们提交给 Kafka。


互换的青春
浏览 222回答 2
2回答

偶然的你

作为参考,这是有效的解决方案:consumer = Consumer({    'bootstrap.servers': config.BOOTSTRAP_SERVERS,    'group.id': config.CONSUMER_GROUP,    'enable.auto.commit': False,})# get all topicstopics = consumer.list_topics()# get all partitionspartitions = []for name, meta  in topics.topics.items():    for partition_id in meta.partitions.keys():        part = TopicPartition(name, partition_id)        partitions.append(part)# get last committed offsetspartitions = consumer.committed(partitions)显然consumer.position不像宣传的那样工作,但consumer.committed会返回存储的偏移量,即使消费者当前没有订阅主题/分区。

慕尼黑5688855

尝试添加consumer.subscribe()或consumer.assign()功能consumer = Consumer({    'bootstrap.servers': config.BOOTSTRAP_SERVERS,    'group.id': config.CONSUMER_GROUP,    'enable.auto.commit': False,})# get all partitionspartitions = []for name, meta  in topics.topics.items():    for partition_id in meta.partitions.keys():        part = TopicPartition(name, partition_id)        partitions.append(part)consumer.assign(partitions)committed = consumer.committed(tp)last_offset = consumer.position(tp)print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset))
随时随地看视频慕课网APP

相关分类

Python
我要回答