Python Kafka 客户端 - 没有错误但无法正常工作

我在 python 中运行 confluent_kafka 客户端。目前,我在尝试生成然后消费消息时没有收到任何错误,但问题是生产者说它成功了,但消费者找不到任何消息。


我创建了一个主题,这是我构建的正在使用的类:


from confluent_kafka import Producer, Consumer

from config import config

import json


class Kafka:

    """

    Kafka Handler.

    """


    def __init__(self, kafka_brokers_sasl, api_key):

        """

        Arguments:

            kafka_brokers_sasl {str} -- String containing kafka brokers separated by comma (no spaces)

            api_key {str} -- Kafka Api Key

        """


        self.driver_options = {

            'bootstrap.servers': kafka_brokers_sasl,

            'sasl.mechanisms': 'PLAIN',

            'security.protocol': 'SASL_SSL',

            'sasl.username': 'token',

            'sasl.password': api_key,

            'log.connection.close' : False,

            #'debug': 'all'

        }


        self.producer_options = {

            'client.id': 'kafka-python-console-sample-producer'

        }

        self.producer_options.update(self.driver_options)


        self.consumer_options = {

            'client.id': 'kafka-python-console-sample-consumer',

            'group.id': 'kafka-python-console-sample-group'

        }

        self.consumer_options.update(self.driver_options)


        self.running = None



    def stop(self):

        self.running = False



    def delivery_report(self, err, msg):

        """ Called once for each message produced to indicate delivery result.

            Triggered by poll() or flush(). """

        if err is not None:

            print('Message delivery failed: {}'.format(err))

        else:

            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))



    def produce(self, topic, data): # Function for producing/uploading data to a Kafka topic


        p = Producer(self.producer_options)


        print("Running?")



桃花长相依
浏览 478回答 2
2回答

扬帆大鱼

我不是 Python 方面的专家,但看起来您是在生成消息后才开始使用的?kafka.produce(config['kafka']['topic'], json.dumps(mock)) kafka.consume(config['kafka']['topic'])您需要在调用生产函数之前调用消耗函数,因为当您启动一个新消费者时,该消费者的默认偏移量将是最新的。因此,例如,如果您在偏移量 5 处生成了一条消息,然后启动了一个新的消费者,则默认情况下,您的消费者偏移量将在偏移量 6 处,并且不会消耗在偏移量 5 处生成的消息。解决方案是要么在产生任何东西之前开始消费,要么将消费者配置设置为从偏移量的开始消费消息。这可以通过设置auto.offset.reset来完成,earliest但我认为第一个解决方案更简单。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python