PCollection 与自身的笛卡尔积

假设我有一个类型的有界 PCollection p。假设 p 不能适合内存,因此不能是 的侧输入。KV<String, Integer>DoFn


示例 p:


("foo", 0)

("bar", 1)

("baz", 2)

我怎么能取p和自身的笛卡尔积?


例如,可能如下所示:p x p


("foo+foo", [("foo", 0), ("foo", 0)])

("foo+bar", [("foo", 0), ("bar", 1)])

("foo+baz", [("foo", 0), ("baz", 2)])

("bar+foo", [("bar", 1), ("foo", 0)])

("bar+bar", [("bar", 1), ("bar", 1)])

("bar+baz", [("bar", 1), ("baz", 2)])

("baz+foo", [("baz", 2), ("foo", 0)])

("baz+bar", [("baz", 2), ("bar", 1)])

("baz+baz", [("baz", 2), ("baz", 2)])


慕神8447489
浏览 133回答 2
2回答

慕桂英4014372

正如你所推测的那样,最简单的方法是使用DoFn将PCollection作为主要和侧面输入进行处理。如果由于 PCollection 太大而无法放入内存而无法做到这一点,则可以将其划分为 N 个不相交的 PCollections,将其传递到每个 PCollections 中,然后将结果平展。例如,你可以写一些类似的东西class CrossProduct(beam.PTransform):&nbsp; def expand(self, pcoll):&nbsp; &nbsp; N = 10&nbsp; &nbsp; parts = pcoll | beam.Partition(lambda element, n: hash(element) % n, N)&nbsp; &nbsp; cross_parts = [&nbsp; &nbsp; &nbsp; &nbsp; pcoll | str(ix) >> beam.FlatMap(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lambda x, side: [(x, s) for s in side],&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; beam.pvalue.AsIter(part))&nbsp; &nbsp; &nbsp; &nbsp; for ix, part in enumerate(parts)]&nbsp; &nbsp; return cross_parts | beam.Flatten()output = input | CrossProduct()但是,请注意,除非 PCollection 的元素特别大,否则如果 PCollection 无法放入内存,则其交叉积的生成(和处理)可能非常大。

慕田峪9158850

我将提出一个使用Python的解决方案。首先,让我们实现算法,然后解决内存限制的问题。import itertools# Let's build a list with your pairscollection_items = [("foo", 0), ("bar", 1), ("baz", 2)]"""A Python generator is a function that produces a sequence of results.&nbsp;It works by maintaining its local state, so that the function can resume again exactly where&nbsp;it left off when called subsequent times. Same generator can't be used twice.I will explain a little later why I use generators"""collection_generator1 = (el for el in collection_items)&nbsp; # Create the first generator# For example; calling next(collection_generator1) => ("foo", 0); next(collection_generator1) => ("bar", 1),# next(collection_generator1) => ("bar": 2)collection_generator2 = (el for el in collection_items) # Create the second generatorcartesian_product = itertools.product(collection_generator1, collection_generator2) # Create the cartesian productfor pair in cartesian_product:&nbsp; &nbsp; first_el, second_el = pair&nbsp; &nbsp; str_pair1, val_pair1 = first_el&nbsp; &nbsp; str_pair2, val_pair2 = first_el&nbsp; &nbsp; name = "{str_pair1}+{str_pair2}".format(str_pair1=str_pair1, str_pair2=str_pair2)&nbsp; &nbsp; item = (name, [first_el, second_el]) # Compose the item&nbsp; &nbsp; print(item)# OUTPUT('foo+foo', [('foo', 0), ('foo', 0)])('foo+foo', [('foo', 0), ('bar', 1)])('foo+foo', [('foo', 0), ('baz', 2)])('bar+bar', [('bar', 1), ('foo', 0)])('bar+bar', [('bar', 1), ('bar', 1)])('bar+bar', [('bar', 1), ('baz', 2)])('baz+baz', [('baz', 2), ('foo', 0)])('baz+baz', [('baz', 2), ('bar', 1)])('baz+baz', [('baz', 2), ('baz', 2)])现在让我们解决内存问题由于您有很多数据,因此最好将它们存储在文件中,在每行上写入一对(如示例中所示),现在让我们读取文件(“输入.txt”)并创建一个包含其数据的生成器。file_generator_1 = (line.strip() for line in open("input.txt"))file_generator_2 = (line.strip() for line in open("input.txt").readlines())现在,您唯一需要做的修改是替换变量名称collection_generator1,collection_generator2 file_generator_1,file_generator_2
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python