-
慕哥9229398
主要思想是计算主题的每个分区中有多少条消息并对所有这些数字求和。结果是有关该主题的消息总数。我使用confluence_kafka作为主库。from confluent_kafka import Consumer, TopicPartitionfrom concurrent.futures import ThreadPoolExecutorconsumer = Consumer({"bootstrap.servers": "localhost:6667", "group.id": "test"})def get_partition_size(topic_name: str, partition_key: int): topic_partition = TopicPartition(topic_name, partition_key) low_offset, high_offset = consumer.get_watermark_offsets(topic_partition) partition_size = high_offset - low_offset return partition_sizedef get_topic_size(topic_name: str): topic = consumer.list_topics(topic=topic_name) partitions = topic.topics[topic_name].partitions workers, max_workers = [], len(partitions) or 1 with ThreadPoolExecutor(max_workers=max_workers) as e: for partition_key in list(topic.topics[topic_name].partitions.keys()): job = e.submit(get_partition_size, topic_name, partition_key) workers.append(job) topic_size = sum([w.result() for w in workers]) return topic_sizeprint(get_topic_size('my.kafka.topic'))
-
大话西游666
一种解决方案是您可以向所有分区各添加一条消息并获取最后的偏移量。根据偏移量,您可以计算到目前为止发送到主题的消息总数。但这不是正确的做法。你不知道消费者已经消费了多少条消息,以及kafka删除了多少条消息。唯一的方法是您可以消费消息并计算数量。
-
慕虎7371278
没有特定的 API 来计算某个主题的记录数。您需要消费并计算从 kafka 消费者收到的记录数。
-
富国沪深
我无法使用 来实现此操作kafka-python,但我可以使用confluent-kafka库相当轻松地完成此操作:from confluent_kafka import Consumertopic = "test_topic"broker = "localhost:9092"def get_count(): consumer = Consumer({ 'bootstrap.servers': broker, 'group.id': 'my-group', 'auto.offset.reset': 'earliest', }) consumer.subscribe([topic]) total_message_count = 0 while True: msg = consumer.poll(1.0) if msg is None: print("No more messages") break if msg.error(): print("Consumer error: {}".format(msg.error())) continue total_message_count = total_message_count + 1 print('Received message {}: {}'.format(total_message_count, msg.value().decode('utf-8'))) consumer.close() print(total_message_count)