我正在尝试从GCS存储桶中读取XML文件的集合,并对其进行处理,其中集合中的每个元素都是代表整个文件的字符串,但是我找不到如何实现此目的的示例,我也无法理解它来自 Apache Beam 文档,主要是关于 Java 版本。
我当前的管道如下所示:
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p
| 'Read from a File' >> beam.io.Read(training_files_folder)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='title:STRING,text:STRING,id:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
解决了第一个问题:事实证明这不适用于DirectRunner,将运行器更改为DataFlowRunner并将Read替换为ReadFromText可以解决此异常:
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p
| 'Read from a File' >> beam.io.ReadFromText(training_files_folder)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='title:STRING,text:STRING,id:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
但是现在我看到这种方法给了我每个文件中的一行作为管道元素,而我希望将整个文件作为一个字符串作为每个元素。不知道该怎么做。我找到了这篇文章,但它是用 java 编写的,完全不知道它是如何与 python 和 gcs 版本一起工作的。
所以看起来 ReadFromText 对我的用例不起作用,我不知道如何创建文件管道。
解决方案:在Ankur的帮助下,我修改了代码,以包括从MatchResult对象列表转换所需的步骤,这是GCSFileSystem返回到字符串pCollection的内容,每个字符串代表一个文件。
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)
相关分类