Beam Python Dataflow Runner 在 apply_WriteToBig

在DataflowRunner内部的实现细节方面,很多人可能并不关心是否BigQuerySink使用WriteToBigQuery。


但是,在我的情况下,我试图将我的代码部署到带有 RunTimeValueProvider 的参数的数据流模板。支持此行为WriteToBigQuery:


class WriteToBigQuery(PTransform):

....


 table (str, callable, ValueProvider): The ID of the table, or a callable

         that returns it. The ID must contain only letters ``a-z``, ``A-Z``,

         numbers ``0-9``, or underscores ``_``. If dataset argument is

         :data:`None` then the table argument must contain the entire table

         reference specified as: ``'DATASET.TABLE'``

         or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one

         argument representing an element to be written to BigQuery, and return

         a TableReference, or a string table name as specified above.

         Multiple destinations are only supported on Batch pipelines at the

         moment.

BigQuerySink不支持它:


class BigQuerySink(dataflow_io.NativeSink):

      table (str): The ID of the table. The ID must contain only letters

        ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If

        **dataset** argument is :data:`None` then the table argument must

        contain the entire table reference specified as: ``'DATASET.TABLE'`` or

        ``'PROJECT:DATASET.TABLE'``.

更有趣的是,BigQuerySink在代码中从 2.11.0 开始就被弃用了。


@deprecated(since='2.11.0', current="WriteToBigQuery")

但是在 DataFlowRunner 中,当前的代码和注释似乎完全不符合预期WriteToBigQuery使用 over 的默认类BigQuerySink:


  def apply_WriteToBigQuery(self, transform, pcoll, options):

    # Make sure this is the WriteToBigQuery class that we expected, and that

    # users did not specifically request the new BQ sink by passing experiment

    # flag.

我的问题是双重的:

  1. DataflowRunner为什么类和类之间的合同/期望存在差异io.BigQuery

  2. 无需等待错误修复,有人对如何强制使用DataflowRunneroverWriteToBigQuery有建议BigQuerySink吗?


婷婷同学_
浏览 131回答 1
1回答

偶然的你

转换有两种不同的WriteToBigQuery写入 BigQuery 的策略:将插入流式传输到 BigQuery 端点定期触发文件加载作业(或批处理管道一次)对于 Python SDK,我们最初只支持流式插入,并且我们有一个只在 Dataflow 上工作的文件加载的 runner-native 实现(这是BigQuerySink.对于在 Dataflow 上运行的批处理管道,BigQuerySink替换为 - 如您正确发现的那样。对于所有其他情况,使用流式插入。在最新版本的 Beam 中,我们在 SDK 中原生添加了对文件加载的支持 - 实现在BigQueryBatchFileLoads.因为我们不想破坏依赖旧行为的用户,所以我们隐藏BigQueryBatchFileLoads了一个实验标志。(标志是use_beam_bq_sink)。所以:在未来的版本中,我们将自动使用BigQueryBatchFileLoads,但目前,您有两个选项可以访问它:直接在您的管道中使用它(例如input | BigQueryBatchFileLoads(...))。通过选项--experiments use_beam_bq_sink,同时使用WriteToBigQuery。我希望这会有所帮助!
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python