猿问

Dataflow BigQuery 插入作业因大数据集而立即失败

我使用梁 python 库设计了一个梁/数据流管道。管道大致执行以下操作:

  1. ParDo:从 API 收集 JSON 数据

  2. ParDo:转换 JSON 数据

  3. I/O:将转换后的数据写入 BigQuery 表

一般来说,代码会做它应该做的事情。但是,当从 API 收集一个大数据集(大约 500.000 个 JSON 文件)时,bigquery 插入作业在使用 DataflowRunner(它与在我的计算机)。使用较小的数据集时,一切正常。

数据流日志如下:

2019-04-22 (00:41:29) Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the...

Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the bq tool: "bq show -j --project_id=X dataflow_job_14675275193414385105". 

2019-04-22 (00:41:29) Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /Wr...

Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /WriteToBigQuery/NativeWrite failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: 

beamapp-X-04212005-04211305-sf4k-harness-lqjg,

beamapp-X-04212005-04211305-sf4k-harness-lgg2,

beamapp-X-04212005-04211305-sf4k-harness-qn55,

beamapp-X-04212005-04211305-sf4k-harness-hcsn

按照建议使用 bq cli 工具来获取有关 BQ 加载作业的更多信息不起作用。找不到工作(我怀疑它是否已经创建,因为即时失败)。


我想我遇到了某种配额/bq 限制,甚至是内存不足问题(请参阅:https ://beam.apache.org/documentation/io/built-in/google-bigquery/ )


限制 BigQueryIO 目前有以下限制。


您无法使用 >您的管道的其他步骤对 BigQuery 写入的完成进行排序。


如果您使用的是 Beam SDK for Python,如果您编写了一个非常大的数据集,您可能会遇到导入大小配额>问题。作为一种解决方法,您可以对数据集进行分区(例如,使用 Beam 的分区转换)并写入多个 BigQuery 表。Beam SDK for Java 没有这个>限制,因为它会为您划分数据集。


对于如何缩小此问题的根本原因,我将不胜感激。


我还想尝试分区 Fn,但没有找到任何 python 源代码示例如何将分区 pcollection 写入 BigQuery 表。


倚天杖
浏览 173回答 2
2回答

宝慕林4294392

可能有助于调试的一件事是查看 Stackdriver 日志。如果您在 Google控制台中打开 Dataflow 作业并单击LOGS图形面板右上角的 ,则应该会打开底部的日志面板。LOGS 面板的右上角有一个指向 Stackdriver 的链接。这将为您提供有关您的工人/洗牌/等的大量日志信息。对于这个特定的工作。其中有很多内容,并且很难过滤掉相关的内容,但希望您能够找到比A work item was attempted 4 times without success. 例如,每个工作人员偶尔会记录它正在使用的内存量,这可以与每个工作人员拥有的内存量(基于机器类型)进行比较,以查看它们是否确实内存不足,或者您的错误是否正在发生别处。祝你好运!

富国沪深

据我所知,在 Cloud Dataflow 和 Apache Beam 的 Python SDK 中没有可用的选项来诊断 OOM(Java SDK 可以)。我建议您在Cloud Dataflow 问题跟踪器中打开功能请求,以获取有关此类问题的更多详细信息。除了检查 Dataflow 作业日志文件之外,我建议您使用Stackdriver Monitoring 工具监控您的管道,该工具提供每个作业的资源使用情况(作为总内存使用时间)。关于 Python SDK 中的 Partition 函数使用,以下代码(基于 Apache Beam文档中提供的示例)将数据拆分为 3 个 BigQuery 加载作业:def partition_fn(input_data, num_partitions):      return int(get_percentile(lines) * num_partitions / 100)    partition = input_data | beam.Partition(partition_fn, 3)    for x in range(3):      partition[x] | 'WritePartition %s' % x >> beam.io.WriteToBigQuery(        table_spec,        schema=table_schema,        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
随时随地看视频慕课网APP

相关分类

Python
我要回答