如何使用apache beam python在管道中附加结果?

我有 apache 光束管道,我使用 pubsub 从输入文件中获取一些文本,然后我正在做一些转换,我正在获取句子和分数,但我的作者过度写入结果而不是附加,我想知道有没有为beam.filesystems 添加模块?


from __future__ import absolute_import


import argparse

import logging

from datetime import datetime


from past.builtins import unicode

import json

from google.cloud import language

from google.cloud.language import enums

from google.cloud.language import types

import apache_beam as beam

import apache_beam.transforms.window as window


from apache_beam.io.filesystems import FileSystems

from apache_beam.io.gcp.pubsub import WriteToPubSub


from apache_beam.examples.wordcount import WordExtractingDoFn

from apache_beam.options.pipeline_options import PipelineOptions

from apache_beam.options.pipeline_options import SetupOptions

from apache_beam.options.pipeline_options import StandardOptions

from apache_beam.io.textio import ReadFromText, WriteToText




def run(argv=None):

  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()

  parser.add_argument(

      '--output',

         dest='output',

        required=True,

        help='GCS destination folder to save the images to (example: gs://BUCKET_NAME/path')

  group = parser.add_mutually_exclusive_group(required=True)

  group.add_argument(

      '--input_topic',

      help=('Input PubSub topic of the form '

            '"projects<project name>/subscriptions/<topic name>".'))

  group.add_argument(

      '--input_subscription',

      help=('Input PubSub subscription of the form '

            '"projects<project name>/subscriptions/<subsciption name>."'))

  known_args, pipeline_args = parser.parse_known_args(argv)


  # We use the save_main_session option because one or more DoFn's in this

  # workflow rely on global context (e.g., a module imported at module level).

  pipeline_options = PipelineOptions(pipeline_args)

  pipeline_options.view_as(SetupOptions).save_main_session = True

  pipeline_options.view_as(StandardOptions).streaming = True

  p = beam.Pipeline(options=pipeline_options)


我只是明白这个:


<sentence n> <score>

我需要一些小的修复,我被卡住了请帮助我。


慕妹3146593
浏览 179回答 1
1回答

白板的微信

为此,您可以尝试使用beam.io.textio.WriteToText:messages = (p | "Read From PubSub" >> beam.io.ReadFromPubSub(subscription=known_args.subscription)&nbsp; &nbsp; | "Write to GCS" >> beam.io.WriteToText('gs://<your_bucket>/<your_file>', file_name_suffix='.txt',append_trailing_newlines=True,shard_name_template=''))当您完成流式传输作业时,这将为您提供一个文件作为输出。希望能帮助到你!
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python