猿问

在 go 中使用 Kafka Avro 消息

我正在尝试以 avro 格式使用 Kafka 消息,但我无法在 Go 中将消息从 avro 解码为 json。


我正在使用 Confluent 平台(3.0.1)。例如,我生成 avro 消息,例如:


kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

{"f1":"message1"}

{"f1":"message2"}

现在我使用 go Kafka 库消费消息:sarama。纯文本消息工作正常。Avro 消息必须被解码。我发现了不同的库:github.com/linkedin/goavro、github.com/elodina/go-avro


但是解码后我得到一个没有值的json(两个库):


{"f1":""}

戈夫罗:


avroSchema := `

{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}

`

codec, err := goavro.NewCodec(avroSchema)

if err != nil {

    log.Fatal(err)

}

bb := bytes.NewBuffer(msg.Value)

decoded, err := codec.Decode(bb)

log.Println(fmt.Sprintf("%s", decoded))

继续前进:


schema := avro.MustParseSchema(avroSchema)

reader := avro.NewGenericDatumReader()

reader.SetSchema(schema)

decoder := avro.NewBinaryDecoder(msg.Value)

decodedRecord := avro.NewGenericRecord(schema)

log.Println(decodedRecord.String())

msg = sarama.ConsumerMessage


拉莫斯之舞
浏览 236回答 2
2回答

弑天下

刚刚发现(通过比较二进制 avro 消息)我必须删除消息字节数组的前 5 个元素 - 现在一切正常:)message = msg.Value[5:]也许有人可以解释为什么

忽然笑

仅当您使用 Confluent 模式注册表时才真正有用。
随时随地看视频慕课网APP

相关分类

Go
我要回答