AVRO 原始类型的 Serde 类

我正在用 Java 编写一个 Kafka 流应用程序,它接受由连接器创建的输入主题,该连接器使用模式注册表和 avro 作为键和值转换器。连接器产生以下模式:


key-schema: "int"

value-schema:{

"type": "record",

"name": "User",

"fields": [

    {"name": "firstname", "type": "string"},

    {"name": "lastname",  "type": "string"}

]}

实际上,有几个主题,key-schema 总是“int”,value-schema 总是某种记录(用户、产品等)。我的代码包含以下定义


Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);


Serde<User> userSerde = new SpecificAvroSerde<>();

userSerde.configure(serdeConfig, false);

起初我尝试使用类似的东西来消费这个主题, Consumed.with(Serdes.Integer(), userSerde);但这不起作用,因为 Serdes.Integer() 期望使用 4 个字节对整数进行编码,但 avro 使用可变长度编码。使用Consumed.with(Serdes.Bytes(), userSerde);有效,但我真的想要 int 而不是字节,所以我将代码更改为此


KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer()

KafkaAvroSerializer keySerializer = new KafkaAvroSerializer();

keyDeserializer.configure(serdeConfig, true); 

keySerializer.configure(serdeConfig, true);

Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);

这使编译器产生警告(它不喜欢(Serde<Integer>)(Serde)强制转换)但它允许我使用


Consumed.with(keySerde, userSerde);并获取一个整数作为键。这工作得很好,我的应用程序按预期运行(很棒!!!)。但是现在我想为键/值定义默认的 serde 并且我无法让它工作。


设置默认值 serde 很简单:


streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

但是我无法弄清楚如何定义默认键 serde。


我试过了


streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName()); 产生运行时错误:找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共无参数构造函数

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); 产生运行时错误:java.lang.Integer 不能转换为 org.apache.avro.specific.SpecificRecord

我错过了什么?谢谢。


芜湖不芜
浏览 131回答 2
2回答

翻翻过去那场雪

我想发布解决方案的工作。请随意增强它。import java.util.Collections;import java.util.Map;import org.apache.kafka.common.serialization.Deserializer;import org.apache.kafka.common.serialization.Serde;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.common.serialization.Serializer;import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;import io.confluent.kafka.serializers.KafkaAvroDeserializer;import io.confluent.kafka.serializers.KafkaAvroSerializer;public class GenericPrimitiveAvroSerDe<T> implements Serde<T> {&nbsp; &nbsp; private final Serde<Object> inner;&nbsp; &nbsp; /**&nbsp; &nbsp; &nbsp;* Constructor used by Kafka Streams.&nbsp; &nbsp; &nbsp;*/&nbsp; &nbsp; public GenericPrimitiveAvroSerDe() {&nbsp; &nbsp; &nbsp; &nbsp; inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());&nbsp; &nbsp; }&nbsp; &nbsp; public GenericPrimitiveAvroSerDe(SchemaRegistryClient client) {&nbsp; &nbsp; &nbsp; &nbsp; this(client, Collections.emptyMap());&nbsp; &nbsp; }&nbsp; &nbsp; public GenericPrimitiveAvroSerDe(SchemaRegistryClient client, Map<String, ?> props) {&nbsp; &nbsp; &nbsp; &nbsp; inner = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client, props));&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {&nbsp; &nbsp; &nbsp; &nbsp; inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);&nbsp; &nbsp; &nbsp; &nbsp; inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void close() {&nbsp; &nbsp; &nbsp; &nbsp; // TODO Auto-generated method stub&nbsp; &nbsp; &nbsp; &nbsp; inner.serializer().close();&nbsp; &nbsp; &nbsp; &nbsp; inner.deserializer().close();&nbsp; &nbsp; }&nbsp; &nbsp; @SuppressWarnings("unchecked")&nbsp; &nbsp; @Override&nbsp; &nbsp; public Serializer<T> serializer() {&nbsp; &nbsp; &nbsp; &nbsp; // TODO Auto-generated method stub&nbsp; &nbsp; &nbsp; &nbsp; Object obj = inner.serializer();&nbsp; &nbsp; &nbsp; &nbsp; return (Serializer<T>) obj;&nbsp; &nbsp; }&nbsp; &nbsp; @SuppressWarnings("unchecked")&nbsp; &nbsp; @Override&nbsp; &nbsp; public Deserializer<T> deserializer() {&nbsp; &nbsp; &nbsp; &nbsp; // TODO Auto-generated method stub&nbsp; &nbsp; &nbsp; &nbsp; Object obj = inner.deserializer();&nbsp; &nbsp; &nbsp; &nbsp; return (Deserializer<T>) obj;&nbsp; &nbsp; }}用作默认流配置:props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);覆盖默认值:final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "http://localhost:8081");&nbsp; &nbsp; &nbsp; &nbsp;final GenericPrimitiveAvroSerDe<String> keyGenericAvroSerde = new GenericPrimitiveAvroSerDe<String>();&nbsp; &nbsp; &nbsp; &nbsp;keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys&nbsp; &nbsp; &nbsp; &nbsp;final GenericPrimitiveAvroSerDe<Long> valueGenericAvroSerde = new GenericPrimitiveAvroSerDe<Long>();&nbsp; &nbsp; &nbsp; &nbsp;valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java