我按照此链接创建了一个模板,该模板构建了一个从 KafkaIO 读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider 无法转换为 java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。Beam 版本是 2.9.0,这是我的代码的一部分。
public interface Options extends PipelineOptions {
@Description("Kafka server")
@Required
ValueProvider<String> getKafkaServer();
void setKafkaServer(ValueProvider<String> value);
@Description("Topic to read from")
@Required
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@Description("Topic to write to")
@Required
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> value);
@Description("File path to write to")
@Required
ValueProvider<String> getOutput();
void setOutput(ValueProvider<String> value);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers(options.getKafkaServer())
.withTopic(options.getInputTopic())
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
)
以下是我运行代码的方式:
mvn compile exec:java \
-Dexec.mainClass=${MyClass} \
-Pdataflow-runner -Dexec.args=" \
--project=${MyClass} \
--stagingLocation=gs://${MyBucket}/staging \
--tempLocation=gs://${MyBucket}/temp \
--templateLocation=gs://${MyBucket}/templates/${MyClass} \
--runner=DataflowRunner"
BIG阳
慕标5832272
相关分类