猿问

如何修复“不兼容的类型:

我按照此链接创建了一个模板,该模板构建了一个从 KafkaIO 读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider 无法转换为 java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。Beam 版本是 2.9.0,这是我的代码的一部分。


public interface Options extends PipelineOptions {

    @Description("Kafka server")

    @Required

    ValueProvider<String> getKafkaServer();


    void setKafkaServer(ValueProvider<String> value);


    @Description("Topic to read from")

    @Required

    ValueProvider<String> getInputTopic();


    void setInputTopic(ValueProvider<String> value);


    @Description("Topic to write to")

    @Required

    ValueProvider<String> getOutputTopic();


    void setOutputTopic(ValueProvider<String> value);


    @Description("File path to write to")

    @Required

    ValueProvider<String> getOutput();


    void setOutput(ValueProvider<String> value);

}


public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    Pipeline p = Pipeline.create(options);


    PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()

            .withBootstrapServers(options.getKafkaServer())

            .withTopic(options.getInputTopic())

            .withKeyDeserializer(LongDeserializer.class)

            .withValueDeserializer(StringDeserializer.class)

            .withoutMetadata() 

    )

以下是我运行代码的方式:


mvn compile exec:java \

-Dexec.mainClass=${MyClass} \

-Pdataflow-runner -Dexec.args=" \

--project=${MyClass} \

--stagingLocation=gs://${MyBucket}/staging \

--tempLocation=gs://${MyBucket}/temp \

--templateLocation=gs://${MyBucket}/templates/${MyClass} \

--runner=DataflowRunner"


智慧大石
浏览 150回答 2
2回答

BIG阳

为了通过 访问值ValueProvider,您需要使用该get方法,然后获取具有具体类型的值。例如:当有选项时:ValueProvider<String> getKafkaServer();您可以通过以下方式访问它:getKafkaServer().get()这将返回您的 String 对象。似乎 KafkaIo Api 需要获取字符串参数而不是 ValueProvider,您必须从 ValueProvider 包装器中提取值。

慕标5832272

我可能会发现问题,即不支持 kafkaIO。以下来自谷歌创建模板。" 一些 I/O 连接器包含接受 ValueProvider 对象的方法。要确定对特定连接器和方法的支持,请参阅 I/O 连接器的 API 参考文档。支持的方法具有 ValueProvider 的重载。如果方法没有重载,该方法不支持运行时参数。以下 I/O 连接器至少有部分 ValueProvider 支持:基于文件的 IO:TextIO、AvroIO、FileIO、TFRecordIO、XmlIO BigQueryIO* BigtableIO(需要 SDK 2.3.0 或更高版本)PubSubIO SpannerIO "
随时随地看视频慕课网APP

相关分类

Java
我要回答