如何使用kafka-python计算主题中的记录(消息)数量

正如标题中所说,我想在我的主题中获得一些记录,但我找不到使用 kafka-python 库的解决方案。有人有什么主意吗 ?



杨魅力
浏览 173回答 4
4回答

慕哥9229398

主要思想是计算主题的每个分区中有多少条消息并对所有这些数字求和。结果是有关该主题的消息总数。我使用confluence_kafka作为主库。from confluent_kafka import Consumer, TopicPartitionfrom concurrent.futures import ThreadPoolExecutorconsumer = Consumer({"bootstrap.servers": "localhost:6667", "group.id": "test"})def get_partition_size(topic_name: str, partition_key: int):    topic_partition = TopicPartition(topic_name, partition_key)    low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)    partition_size = high_offset - low_offset    return partition_sizedef get_topic_size(topic_name: str):    topic = consumer.list_topics(topic=topic_name)    partitions = topic.topics[topic_name].partitions    workers, max_workers = [], len(partitions) or 1    with ThreadPoolExecutor(max_workers=max_workers) as e:        for partition_key in list(topic.topics[topic_name].partitions.keys()):            job = e.submit(get_partition_size, topic_name, partition_key)            workers.append(job)    topic_size = sum([w.result() for w in workers])    return topic_sizeprint(get_topic_size('my.kafka.topic'))

大话西游666

一种解决方案是您可以向所有分区各添加一条消息并获取最后的偏移量。根据偏移量,您可以计算到目前为止发送到主题的消息总数。但这不是正确的做法。你不知道消费者已经消费了多少条消息,以及kafka删除了多少条消息。唯一的方法是您可以消费消息并计算数量。

慕虎7371278

没有特定的 API 来计算某个主题的记录数。您需要消费并计算从 kafka 消费者收到的记录数。

富国沪深

我无法使用 来实现此操作kafka-python,但我可以使用confluent-kafka库相当轻松地完成此操作:from confluent_kafka import Consumertopic = "test_topic"broker = "localhost:9092"def get_count():    consumer = Consumer({        'bootstrap.servers': broker,        'group.id': 'my-group',        'auto.offset.reset': 'earliest',    })    consumer.subscribe([topic])    total_message_count = 0    while True:        msg = consumer.poll(1.0)        if msg is None:            print("No more messages")            break        if msg.error():            print("Consumer error: {}".format(msg.error()))            continue        total_message_count = total_message_count + 1        print('Received message {}: {}'.format(total_message_count,     msg.value().decode('utf-8')))    consumer.close()    print(total_message_count)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python