猿问

使用 Apach Beam 执行集差

我有两个列表a,b它们之间有一些共同的元素,我想找到那些共同的元素及其数量,为此我编写了以下程序。


import functools

import apache_beam as beam

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

from apache_beam.runners.direct.direct_runner import DirectRunner

from apache_beam.options.pipeline_options import PipelineOptions


options = PipelineOptions()


p = beam.Pipeline(InteractiveRunner(underlying_runner=DirectRunner()), options=options)


def form_pair(element, side_input):

  for i,e in enumerate(side_input):

    if e == element:

      return i,e



a = ['a','b', 'c', 'c', 'b']

b = ['a','a','b', 'c', 'b', 'b','d', 'e', 'f']


x0 = p | "0" >> beam.Create(a) | "1" >> beam.Distinct()

x1 = beam.pvalue.AsList(x0)



x3 = p | "2" >> beam.Create(b)

x4 = x3 | "3" >> beam.Map(functools.partial(form_pair, side_input=x1))

x5 = x4 | "4" >> beam.combiners.Count.PerKey()



r = p.run().wait_until_finish()


print(r.get(x5))

这给了我以下错误


TypeError: 'AsList' object is not iterable [while running '3']


回首忆惘然
浏览 96回答 1
1回答

尚方宝剑之说

我传递的侧面输入功能beam.Map不正确这是正确的方法x4 = x3 | "3" >> beam.Map(form_pair, x1)而不是x4 = x3 | "3" >> beam.Map(functools.partial(form_pair, side_input=x1))which 是错误的。
随时随地看视频慕课网APP

相关分类

Python
我要回答