主要目标是聚合两个 Kafka 主题,一个是压缩的慢速移动数据,另一个是每秒接收的快速移动数据。
我已经能够在简单的场景中使用消息,例如使用以下内容的 KV (Long,String):
PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long,
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
PCollection<String> output = input.apply(Values.<String>create());
但这似乎不是当您需要从 AVRO 反序列化时的方法。我有一个需要消耗的 KV(STRING, AVRO)。
我尝试从 AVRO 模式生成 Java 类,然后将它们包含在“应用”中,例如:
PCollection<MyClass> output = input.apply(Values.<MyClass>create());
但这似乎不是正确的方法。
是否有任何人可以指出我的文档/示例,以便我了解您将如何使用 Kafka AVRO 和 Beam?
我已经更新了我的代码:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;
public class Main {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
);
p.run();
}
}
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;
Myclass(){}
Myclass(String n, String a) {
this.name= n;
this.age= a;
}
}
但我现在收到以下错误
incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >
我必须导入不正确的序列化程序?
SMILET
小唯快跑啊
Cats萌萌
慕斯709654
相关分类