Confluent-Kafka:Avro 序列化与 Python 消费者中的模式处理混淆

我试图了解 Confluent Kafka 上的 Avro 序列化以及模式注册表的使用。直到最后一切都很顺利,但 AVRO 的最终期望让我感到很困惑。根据我的阅读和理解,Avro Serialization 为我们提供了灵活性,当我们更改架构时,我们可以简单地进行管理,而不会影响旧的生产者/消费者。


同样,我开发了一个 python 生成器,它将检查 Schema-Registry 中的 Schema 是否存在,如果不存在,则创建它并开始生成如下所示的 json 消息。当我需要更改架构时,我只需在我的生产者中更新它,这会生成具有新架构的消息。


我的旧架构:


data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'

Producer-1 的示例数据:


{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'tom_Rogers@TEST.co.nz', u'principal': u'tom@EXAMPLE.COM'}

我的新架构:


data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"new_passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"new_passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'


料青山看我应如是
浏览 287回答 1
1回答

杨魅力

目前尚不清楚您在注册表上使用的兼容性设置,但我将向后假设,这意味着您需要添加一个具有默认值的字段。听起来您正在使用 Python,KeyError因为这些键不存在。相反msg.value()["non-existing-key"],您可以尝试选项 1:将其视为 dict()msg.value().get("non-existing-key", "Default value")选项 2:单独检查所有可能不存在的键some_var = None  # What you want to parseval = msg.value()if "non-existing-key" not in val:    some_var = "Default Value"否则,您必须在旧数据上“投影”新模式,这是 Java 代码通过使用SpecificRecord子类所做的。这样,较旧的数据将使用较新的架构进行解析,该架构具有带有默认值的较新字段。如果你GenericRecord在 Java 中使用,你会遇到类似的问题。我不确定在 Python 中是否有与 Java 的SpecificRecord.顺便说一句,我不认为字符串"None"可以申请logicalType=timestamp
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python