从 Apache Beam 读取 CSV 并写入 BigQuery

我有一个 GCS 存储桶,我试图从中读取大约 20 万个文件,然后将它们写入 BigQuery。问题是我无法创建与代码配合良好的 PCollection。我正在关注本教程以供参考。


我有这个代码:


from __future__ import absolute_import


import argparse

import logging

import os


from past.builtins import unicode


import apache_beam as beam

from apache_beam.io import ReadFromText, ReadAllFromText

from apache_beam.io import WriteToText

from apache_beam.metrics import Metrics

from apache_beam.metrics.metric import MetricsFilter

from apache_beam.options.pipeline_options import PipelineOptions

from apache_beam.options.pipeline_options import SetupOptions

from google.cloud import storage


import regex as re


# storage_client = storage.Client()

# bucket = storage_client.get_bucket('mybucket')

#

# blobs = bucket.list_blobs()

# l=list(blobs)

# x=[y.name for y in l]

# c=x[1:]

# print(len(c))

files = ['gs://mybucket/_chunk1',

         'gs://mybucket/_chunk0']



class DataIngestion:

    """A helper class which contains the logic to translate the file into

    a format BigQuery will accept."""


    def parse_method(self, string_input):


        x="""{}""".format(string_input)

        rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")

        d = {}

        d['name'], d['date'], d['geometry'], d['value0'], d['value1'], d['value2']=rx.split(x)

        d['geometry']=d['geometry'].strip('"')


        return d


def run(argv=None):

    """Main entry point; defines and runs the pipeline."""


    data_ingestion = DataIngestion()

    p = beam.Pipeline(options=PipelineOptions())



    (p

    | 'Create PCollection' >> beam.Create(files)

    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)

    | 'String To BigQuery Row' >> beam.Map(lambda s:

    data_ingestion.parse_method(s))

    | 'Write to BigQuery' >> beam.io.Write(

    beam.io.BigQuerySink(

    'mytable',


files问题是如果列表只有一个元素,这段代码就可以完美运行。只要有超过 1 个元素,转换“String To BigQuery Row”就会出错并显示error: nothing to repeat [while running 'String To BigQuery Row']。这可能与正则表达式模块有关,但我无法弄清楚出了什么问题,因为它在给定 1 个文件时可以完美运行。




胡说叔叔
浏览 259回答 1
1回答

互换的青春

OP 的评论让我意识到我的错误:预期的库是regex,而不是 python 的 builtin re。使用import regex as re不仅让我感到困惑,而且还导致re库抛出nothing to repeat错误。这是因为默认情况下 Dataflow 不会保存您的主会话。当您的解析函数中的代码正在执行时,它无法访问re您在构建时导入的上下文。通常,这会失败NameError,但由于您使用的是有效的库名称,因此代码假定您指的是内置re库并尝试按原样执行它。如果您import regex改用,您会看到NameError: name 'regex' is not defined,这是代码失败的真正原因。为了解决这个问题,要么将导入语句移动到解析函数本身,要么--save_main_session作为选项传递给运行程序。有关更多详细信息,请参见此处。老答案:虽然我不知道您使用的是哪个版本的 Python,但您对正则表达式的怀疑似乎是正确的。 *是一个特殊字符,表示它之前的重复,但它(是一个表示分组的特殊字符,所以类似的模式(*SKIP)在语法上似乎不正确。在 Python 3.7 中,上述表达式甚至无法编译:python -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'Traceback (most recent call last):&nbsp; File "<string>", line 1, in <module>&nbsp; File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 234, in compile&nbsp; &nbsp; return _compile(pattern, flags)&nbsp; File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 286, in _compile&nbsp; &nbsp; p = sre_compile.compile(pattern, flags)&nbsp; File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_compile.py", line 764, in compile&nbsp; &nbsp; p = sre_parse.parse(p, flags)&nbsp; File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 930, in parse&nbsp; &nbsp; p = _parse_sub(source, pattern, flags & SRE_FLAG_VERBOSE, 0)&nbsp; File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub&nbsp; &nbsp; not nested and not items))&nbsp; File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 816, in _parse&nbsp; &nbsp; p = _parse_sub(source, state, sub_verbose, nested + 1)&nbsp; File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub&nbsp; &nbsp; not nested and not items))&nbsp; File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 651, in _parse&nbsp; &nbsp; source.tell() - here + len(this))re.error: nothing to repeat at position 11Python 2.7.15 也不接受它:python2 -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'Traceback (most recent call last):&nbsp; File "<string>", line 1, in <module>&nbsp; File "/usr/lib/python2.7/re.py", line 194, in compile&nbsp; &nbsp; return _compile(pattern, flags)&nbsp; File "/usr/lib/python2.7/re.py", line 251, in _compile&nbsp; &nbsp; raise error, v # invalid expressionsre_constants.error: nothing to repeat虽然我不知道您要匹配哪些字符串,但我怀疑您的某些字符需要转义。例如"\{[^{}]+\}(\*SKIP)(\*FAIL)|,"
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python