我正在尝试获取每个可用分区的当前偏移量。根据文档,consumer.position应该可以解决问题,所以我这样尝试:
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all topics
topics = consumer.list_topics()
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
# get all offsets
x = consumer.position(partitions)
但是,结果分区中的所有偏移量x仍然是-1001.
如果我使用镜头或其他工具进行检查,我可以看到这个结果不正确,我正在取消的消费者组已经消费了消息并将它们提交给 Kafka。
偶然的你
慕尼黑5688855
相关分类