使用 Apache Beam 反序列化 Kafka AVRO 消息

主要目标是聚合两个 Kafka 主题,一个是压缩的慢速移动数据,另一个是每秒接收的快速移动数据。


我已经能够在简单的场景中使用消息,例如使用以下内容的 KV (Long,String):


PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, 

String>read()

.withKeyDeserializer(LongDeserializer.class)

.withValueDeserializer(StringDeserializer.class)

  

PCollection<String> output = input.apply(Values.<String>create());

但这似乎不是当您需要从 AVRO 反序列化时的方法。我有一个需要消耗的 KV(STRING, AVRO)。


我尝试从 AVRO 模式生成 Java 类,然后将它们包含在“应用”中,例如:


PCollection<MyClass> output = input.apply(Values.<MyClass>create());

但这似乎不是正确的方法。


是否有任何人可以指出我的文档/示例,以便我了解您将如何使用 Kafka AVRO 和 Beam?


我已经更新了我的代码:


import io.confluent.kafka.serializers.KafkaAvroDeserializer;

import org.apache.beam.sdk.Pipeline;

import org.apache.beam.sdk.coders.AvroCoder;

import org.apache.beam.sdk.io.kafka.KafkaIO;

import org.apache.beam.sdk.options.PipelineOptions;

import org.apache.beam.sdk.options.PipelineOptionsFactory;

import org.apache.beam.sdk.values.KV;

import org.apache.beam.sdk.values.PCollection;

import org.apache.kafka.common.serialization.LongDeserializer;


public class Main {


public static void main(String[] args) {


    PipelineOptions options = PipelineOptionsFactory.create();


    Pipeline p = Pipeline.create(options);


    PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()

            .withKeyDeserializer(LongDeserializer.class)

            .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))

    );


    p.run();


}

}

import org.apache.beam.sdk.coders.AvroCoder;

import org.apache.beam.sdk.coders.DefaultCoder;


@DefaultCoder(AvroCoder.class)

public class Myclass{

String name;

String age;


Myclass(){}

Myclass(String n, String a) {

    this.name= n;

    this.age= a;

}

}

但我现在收到以下错误


incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >

我必须导入不正确的序列化程序?


慕村225694
浏览 99回答 4
4回答

SMILET

我遇到了同样的问题。在此邮件存档中找到了解决方案。 http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E在您的情况下,您需要定义自己的,它可以从如下Deserializer<MyClass>扩展。AbstractKafkaAvroDeserializerpublic class MyClassKafkaAvroDeserializer extends&nbsp; AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {&nbsp;&nbsp;&nbsp; @Override&nbsp; public void configure(Map<String, ?> configs, boolean isKey) {&nbsp; &nbsp; &nbsp; configure(new KafkaAvroDeserializerConfig(configs));&nbsp; }&nbsp; @Override&nbsp; public MyClass deserialize(String s, byte[] bytes) {&nbsp; &nbsp; &nbsp; return (MyClass) this.deserialize(bytes);&nbsp; }&nbsp; @Override&nbsp; public void close() {} }然后将您的KafkaAvroDeserializer指定为 ValueDeserializer。p.apply(KafkaIO.<Long, MyClass>read()&nbsp;.withKeyDeserializer(LongDeserializer.class)&nbsp;.withValueDeserializer(MyClassKafkaAvroDeserializer.class) );

小唯快跑啊

您可以使用 KafkaAvroDeserializer,如下所示:PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read().withKeyDeserializer(LongDeserializer.class)&nbsp; .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))其中MyClass是 POJO 类生成的 Avro Schema。确保您的 POJO 类具有注释 AvroCoder,如下例所示:@DefaultCoder(AvroCoder.class)&nbsp; &nbsp;public class MyClass{&nbsp; &nbsp; &nbsp; String name;&nbsp; &nbsp; &nbsp; String age;&nbsp; &nbsp; &nbsp; MyClass(){}&nbsp; &nbsp; &nbsp; MyClass(String n, String a) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;this.name= n;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;this.age= a;&nbsp; &nbsp; &nbsp; }&nbsp; }

Cats萌萌

我今天遇到了类似的问题,并遇到了以下示例,它为我解决了这个问题。https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java对我来说缺少的部分是(类)KafkaAvroDeserializerKafkaIO.<String,&nbsp;MyClass>read() &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.withBootstrapServers("kafka:9092") &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.withTopic("dbserver1.inventory.customers") &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.withKeyDeserializer(StringDeserializer.class) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class,&nbsp;AvroCoder.of(MyClass.class))

慕斯709654

我也发现这行得通import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;...public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass> {}....withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))...MyCustomClass使用 Avro 工具生成的代码在哪里。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java