猿问

Apache Beam Go SDK:如何将 PCollection<string> 转换为

我正在使用 Apache Beam Go SDK 并且很难以正确的格式获取PCollection以按键进行分组/组合。


我在 PCollection 的字符串中每个键有多个记录,如下所示:


Bob, cat

Bob, dog

Carla, cat

Carla, bunny

Doug, horse

我想使用GroupByKey和CombinePerKey,这样我就可以像这样汇总每个人的宠物:


Bob, [cat, dog]

Carla, [cat, bunny]

Doug, [horse]

如何将 PCollection<string> 转换为 PCollection<KV<string, string>>?


他们在这里提到了类似的东西,但不包括聚合字符串值的代码。


我可以使用 ParDo 获取字符串键和字符串值,如下所示,但我不知道如何转换为 GroupPerKey 输入所需的 KV<string, string> 或 CoGBK<string, string> 格式。


pcolOut := beam.ParDo(s, func(line string) (string, string) {

  cleanString := strings.TrimSpace(line)

  openingChar := ","

  iStart := strings.Index(cleanString, openingChar)

  key := cleanString[0:iStart]

  value := cleanString[iStart+1:]

        

// How to convert to PCollection<KV<string, string>> before returning?

  return key, value

}, pcolIn)


groupedKV := beam.GroupByKey(s, pcolOut) 

它失败并出现以下错误。有什么建议么?


panic:  inserting ParDo in scope root

        creating new DoFn in scope root

        binding fn main.main.func2

        binding params [{Value string} {Value string}] to input CoGBK<string,string>

values of CoGBK<string,string> cannot bind to {Value string}


蛊毒传说
浏览 110回答 1
1回答

汪汪一只猫

要映射到 KV,您可以应用 MapElements 并使用 into() 来设置 KV 类型,并在 via() 逻辑中创建一个新KV.of(myKey, myValue)的 ,例如,要获取一个KV<String,String>,请使用以下内容:&nbsp; &nbsp; PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(&nbsp; &nbsp; &nbsp; &nbsp; TypeDescriptors.kvs(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TypeDescriptors.strings(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TypeDescriptors.strings()))&nbsp; &nbsp; &nbsp; &nbsp; .via(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; linkpage -> KV.of(dataFile, linkpage)));
随时随地看视频慕课网APP

相关分类

Go
我要回答