我正在尝试创建一个数据流模板,该模板将输入参数作为.遵循文档中的示例RuntimeValue
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
# [START example_wordcount_templated]
class WordcountTemplatedOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# Use add_value_provider_argument for arguments to be templatable
# Use add_argument as usual for non-templatable arguments
parser.add_value_provider_argument(
'--input', help='Path of the file to read from')
parser.add_argument(
'--output', required=True, help='Output file to write results to.')
pipeline_options = PipelineOptions(['--output', 'some/output_path'])
with beam.Pipeline(options=pipeline_options) as p:
wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)
lines = p | 'Read' >> ReadFromText(wordcount_options.input)
# [END example_wordcount_templated]
(直接取自官方代码段)在尝试使用以下命令创建模板时(填写详细信息)时会出现以下错误:
python -m examples.mymodule \
--runner DataflowRunner \
--project YOUR_PROJECT_ID \
--staging_location gs://YOUR_BUCKET_NAME/staging \
--temp_location gs://YOUR_BUCKET_NAME/temp \
--template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
File "lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 139, in _f
raise error.RuntimeValueProviderError('%s not accessible' % obj)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: None) not
accessible
文档还指出:
某些 I/O 连接器包含接受 ValueProvider 对象的方法。若要确定对 I/O 连接器及其方法的支持,请参阅连接器的 API 参考文档。以下 I/O 连接器接受运行时参数:
基于文件的 IO:textio、avroio、tfrecordio
我不确定为什么示例代码会给出错误。有人可以帮我吗?
对于它的价值,我正在使用:
apache-beam = {extras = ["gcp"], version = "^2.19.0"}
牧羊人nacy
相关分类