我有一个 GCS 存储桶,我试图从中读取大约 20 万个文件,然后将它们写入 BigQuery。问题是我无法创建与代码配合良好的 PCollection。我正在关注本教程以供参考。
我有这个代码:
from __future__ import absolute_import
import argparse
import logging
import os
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage
import regex as re
# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
files = ['gs://mybucket/_chunk1',
'gs://mybucket/_chunk0']
class DataIngestion:
"""A helper class which contains the logic to translate the file into
a format BigQuery will accept."""
def parse_method(self, string_input):
x="""{}""".format(string_input)
rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")
d = {}
d['name'], d['date'], d['geometry'], d['value0'], d['value1'], d['value2']=rx.split(x)
d['geometry']=d['geometry'].strip('"')
return d
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
data_ingestion = DataIngestion()
p = beam.Pipeline(options=PipelineOptions())
(p
| 'Create PCollection' >> beam.Create(files)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
'mytable',
files问题是如果列表只有一个元素,这段代码就可以完美运行。只要有超过 1 个元素,转换“String To BigQuery Row”就会出错并显示error: nothing to repeat [while running 'String To BigQuery Row']。这可能与正则表达式模块有关,但我无法弄清楚出了什么问题,因为它在给定 1 个文件时可以完美运行。
胡说叔叔
互换的青春
随时随地看视频慕课网APP
相关分类