我正在尝试执行去规范化操作,我需要使用以下逻辑重新组织表:
| itemid | class | value |
+--------+-------+-------+
| 1 | A | 0.2 | | itemid | value A | value B | value C |
| 1 | B | 10.3 | ==> +--------+---------+---------+---------+
| 2 | A | 3.0 | ==> | 1 | 0.2 | 10.3 | |
| 2 | B | 0.2 | ==> | 2 | 3.0 | 0.2 | |
| 3 | A | 0.0 | | 3 | 0.0 | 1.2 | 5.4 |
| 3 | B | 1.2 |
| 3 | C | 5.4 |
我的方法是执行一个 for 循环以按 进行过滤class,前提是我知道先验类列表,然后加入生成的 pcollection。
高级代码:
CLASSES = ["A", "B", "C"]
tables = [
(
data
| "Filter by Language" >> beam.Filter(lambda elem: elem["class"]==c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for cin CLASSES
]
和加入:
_ = (
tables
| "Flatten" >> beam.Flatten()
| "Join Collections" >> beam.GroupByKey()
| "Remove key" >> beam.MapTuple(lambda _, val: val)
| "Merge dicts" >> beam.ParDo(mergeDicts())
| "Write to GCS" >> beam.io.WriteToText(output_file)
)
与(根据 Peter Kim 的建议进行编辑):
class mergeDicts(beam.DoFn):
process(self, elements):
result = {}
for dictionary in elements:
if len(dictionary)>0:
result["itemid"] = dictionary["itemid"]
result["value {}".format(dictionary["class"])] = dictionary["value"]
yield result
我这里的问题是,当管道在 Apache Beam 计算引擎中执行时,我获得了由列表的最后一个元素过滤的相同 pcollections,在本例中是 C。
[已添加] 看起来 Apache Beam 引擎采用最终状态的迭代变量,这意味着迭代列表的最后一个元素,用于所有调用的分支。
我显然采用了错误的方法,但是哪种方法应该是执行此操作的最佳方法?
Qyouu
宝慕林4294392
函数式编程
相关分类