如何使用 Benthos 读取和解码来自 Kafka 的 AVRO 消息及其关联的

我正在使用 Benthos 从 Kafka 读取 AVRO 编码的消息,其中kafka_key元数据字段设置为还包含 AVRO 编码的有效负载。这些 AVRO 编码的有效载荷的模式存储在模式注册表中,Benthos 有一个schema_registry_decode用于解码它们的处理器。我希望为每个包含两个字段的 Kafka 消息生成输出 JSON 消息,一个称为content包含解码的 AVRO 消息,另一个称为包含Benthos 收集metadata的各种元数据字段kafka_key,包括解码的有效负载。



倚天杖
浏览 157回答 1
1回答

隔江千里

事实证明,可以使用这样的branch处理器来实现这一点:input:  kafka:    addresses:      - localhost:9092    consumer_group: benthos_consumer_group    topics:      - benthos_inputpipeline:  processors:    # Decode the message    - schema_registry_decode:        url: http://localhost:8081    # Populate output content field    - bloblang: |        root.content = this    # Decode kafka_key metadata payload and populate output metadata field    - branch:        request_map: |          root = meta("kafka_key")        processors:          - schema_registry_decode:              url: http://localhost:8081        result_map: |          root.metadata = meta()          root.metadata.kafka_key = thisoutput:  stdout: {}
打开App,查看更多内容
随时随地看视频慕课网APP