在 Python Apache Beam 中使用 value provider 参数的方法

现在我只能使用 ParDo 在类中获取 RunTime 值,是否有另一种方法可以像在我的函数中一样使用运行时参数?


这是我现在得到的代码:


class UserOptions(PipelineOptions):

    @classmethod

    def _add_argparse_args(cls, parser):

        parser.add_value_provider_argument('--firestore_document',default='')


def run(argv=None):


    parser = argparse.ArgumentParser()


    pipeline_options = PipelineOptions()


    user_options = pipeline_options.view_as(UserOptions)


    pipeline_options.view_as(SetupOptions).save_main_session = True


    with beam.Pipeline(options=pipeline_options) as p:


        rows = (p 

        | 'Create inputs' >> beam.Create(['']) 

        | 'Call Firestore' >> beam.ParDo(

                CallFirestore(user_options.firestore_document)) 

        | 'Read DB2' >> beam.Map(ReadDB2))

我希望 user_options.firestore_document 可以在其他功能中使用,而无需执行 ParDo


慕田峪4524236
浏览 99回答 1
1回答

LEATH

您可以使用价值提供者的唯一方法是在 ParDos 和 Combines 中。不可能在创建中传递值提供者,但您可以定义一个 DoFn,它返回您在构造函数中传递给它的值提供者:class OutputValueProviderFn(beam.DoFn):  def __init__(self, vp):    self.vp = vp  def process(self, unused_elm):    yield self.vp.get()在您的管道中,您将执行以下操作:user_options = pipeline_options.view_as(UserOptions)with beam.Pipeline(options=pipeline_options) as p:  my_value_provided_pcoll = (      p      | beam.Create([None])      | beam.ParDo(OutputValueProviderFn(user_options.firestore_document))这样你就不会在 Create 中使用它,因为这是不可能的,但你仍然可以在 PCollection 中获得它。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python