猿问

使用 Google Cloud DataFlow python sdk 读取一组 xml 文件

我正在尝试从GCS存储桶中读取XML文件的集合,并对其进行处理,其中集合中的每个元素都是代表整个文件的字符串,但是我找不到如何实现此目的的示例,我也无法理解它来自 Apache Beam 文档,主要是关于 Java 版本。


我当前的管道如下所示:


p = beam.Pipeline(options=PipelineOptions(pipeline_args))


(p

 | 'Read from a File' >> beam.io.Read(training_files_folder)

 | 'String To BigQuery Row' >> beam.Map(lambda s:

                                        data_ingestion.parse_method(s))

 | 'Write to BigQuery' >> beam.io.Write(

            beam.io.BigQuerySink(

                known_args.output,

                schema='title:STRING,text:STRING,id:STRING',

                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))

p.run().wait_until_finish()

解决了第一个问题:事实证明这不适用于DirectRunner,将运行器更改为DataFlowRunner并将Read替换为ReadFromText可以解决此异常:


p = beam.Pipeline(options=PipelineOptions(pipeline_args))


(p

 | 'Read from a File' >> beam.io.ReadFromText(training_files_folder)

 | 'String To BigQuery Row' >> beam.Map(lambda s:

                                        data_ingestion.parse_method(s))

 | 'Write to BigQuery' >> beam.io.Write(

            beam.io.BigQuerySink(

                known_args.output,

                schema='title:STRING,text:STRING,id:STRING',

                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,

                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))

p.run().wait_until_finish() 

但是现在我看到这种方法给了我每个文件中的一行作为管道元素,而我希望将整个文件作为一个字符串作为每个元素。不知道该怎么做。我找到了这篇文章,但它是用 java 编写的,完全不知道它是如何与 python 和 gcs 版本一起工作的。


所以看起来 ReadFromText 对我的用例不起作用,我不知道如何创建文件管道。


解决方案:在Ankur的帮助下,我修改了代码,以包括从MatchResult对象列表转换所需的步骤,这是GCSFileSystem返回到字符串pCollection的内容,每个字符串代表一个文件。


p = beam.Pipeline(options=PipelineOptions(pipeline_args))

gcs = GCSFileSystem(PipelineOptions(pipeline_args))

gcs_reader = GCSFileReader(gcs)



一只名叫tom的猫
浏览 162回答 1
1回答
随时随地看视频慕课网APP

相关分类

Python
我要回答