猿问

通过相关管道处理 Dataflow/Apache Beam 中的拒绝

我有一个从 BigQuery 获取数据并将其写入 GCS 的管道,但是,如果我发现任何拒绝,我想将它们纠正到 Bigquery 表。我将拒绝收集到全局列表变量中,然后将该列表加载到 BigQuery 表中。当我在本地运行该过程时,该过程运行良好,因为管道以正确的顺序运行。当我使用 dataflowrunner 运行它时,它不能保证顺序(我希望 pipeline1 在 pipeline2 之前运行。有没有办法使用 python 在 Dataflow 中拥有依赖的管道?或者还请建议是否可以用更好的方法解决这个问题。提前致谢。


with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:

 

    data = (pipeline1

               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))

               | 'combine output to list' >> beam.combiners.ToList()

               | 'tranform' >> beam.Map(lambda x: somefunction)  # Collecting rejects in the except block of this method to a global list variable

               ....etc

               | 'to gcs' >> beam.io.WriteToText(output)

               )


# Loading the rejects gathered in the above pipeline to Biquery

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:

    rejects = (pipeline2

                    | 'create pipeline' >> beam.Create(reject_list)

                    | 'to json format' >> beam.Map(lambda data: {.....})

                    | 'to bq' >> beam.io.WriteToBigQuery(....)

                    )


繁花不似锦
浏览 1593回答 1
1回答

皈依舞

您可以执行类似的操作,但只需 1 个管道,并在转换中添加一些附加代码。应该beam.Map(lambda x: somefunction)有两个输出:一个被写入 GCS,一个被拒绝的元素最终将被写入 BigQuery。为此,您的转换函数必须返回一个TaggedOutput.第二个PCollection,然后您可以写入 BigQuery。您不需要Create在管道的第二个分支中有一个。管道将是这样的:with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:     data = (pipeline1               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))               | 'combine output to list' >> beam.combiners.ToList()               | 'tranform' >> beam.Map(transform)  # Tagged output produced here    pcoll_to_gcs = data.gcs_output    pcoll_to_bq  = data.rejected    pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)    pcoll_to_bq  | "to bq" >> beam.io.WriteToBigQuery(....)那么transform函数会是这样的def transform(element):  if something_is_wrong_with_element:    yield pvalue.TaggedOutput('rejected', element)  transformed_element = ....  yield pvalue.TaggedOutput('gcs_output', transformed_element)
随时随地看视频慕课网APP

相关分类

Python
我要回答