我在 python 中运行 confluent_kafka 客户端。目前,我在尝试生成然后消费消息时没有收到任何错误,但问题是生产者说它成功了,但消费者找不到任何消息。
我创建了一个主题,这是我构建的正在使用的类:
from confluent_kafka import Producer, Consumer
from config import config
import json
class Kafka:
"""
Kafka Handler.
"""
def __init__(self, kafka_brokers_sasl, api_key):
"""
Arguments:
kafka_brokers_sasl {str} -- String containing kafka brokers separated by comma (no spaces)
api_key {str} -- Kafka Api Key
"""
self.driver_options = {
'bootstrap.servers': kafka_brokers_sasl,
'sasl.mechanisms': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': 'token',
'sasl.password': api_key,
'log.connection.close' : False,
#'debug': 'all'
}
self.producer_options = {
'client.id': 'kafka-python-console-sample-producer'
}
self.producer_options.update(self.driver_options)
self.consumer_options = {
'client.id': 'kafka-python-console-sample-consumer',
'group.id': 'kafka-python-console-sample-group'
}
self.consumer_options.update(self.driver_options)
self.running = None
def stop(self):
self.running = False
def delivery_report(self, err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
def produce(self, topic, data): # Function for producing/uploading data to a Kafka topic
p = Producer(self.producer_options)
print("Running?")
扬帆大鱼
相关分类