猿问

如何将RDD拆分为两个或多个RDD?

如何将RDD拆分为两个或多个RDD?

我正在寻找一种将RDD分割成两个或多个RDD的方法。我最近看到的是ScalaSPark:将集合拆分成几个RDD?仍然是一个单一的RDD。

如果您熟悉SAS,如下所示:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

这就产生了两个不同的数据集。必须立即坚持才能得到我想要的结果.。


凤凰求蛊
浏览 4222回答 3
3回答

慕沐林林

不可能从单个转换*生成多个RDDs。如果要拆分RDD,则必须应用filter对于每个分裂的条件。例如:def even(x): return x % 2 == 0def odd(x): return not even(x)rdd = sc.parallelize(range(20))rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))如果您只有二进制条件,而且计算成本很高,您可能更喜欢这样的东西:kv_rdd = rdd.map(lambda x: (x, odd(x)))kv_rdd.cache()rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()它只意味着一个谓词计算,但需要对所有数据进行额外的传递。需要注意的是,只要输入rdd被正确地缓存,并且没有关于数据分布的附加假设,在重复过滤器和嵌套if-倒换循环之间的时间复杂度方面没有显著差异。对于N个元素和M条件,您必须执行的操作数显然与N乘以M成正比。如果是for-循环,它应该更接近于(N+MN)/2,重复滤波器正好是NM,但在一天结束时,它只不过是O(NM)。你可以看到我的讨论*詹森·伦德曼读一些正反两方面的文章。在非常高的层次上,您应该考虑两件事:星火转换是懒惰的,直到您执行一个操作,您的rdd才会成为现实。这有什么关系?回到我的例子:rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))如果以后我决定我只需要rdd_odd那么就没有理由去实现rdd_even.如果您查看一下要计算的SAS示例work.split2您需要同时实现输入数据和work.split1.RDDs提供了一个声明性API。当你使用filter或map这完全取决于火花引擎如何执行这一操作。只要传递给转换的函数是无副作用的,它就会为优化整个管道创造多种可能性。到头来,这起案件还不够特殊,不足以证明它本身的转变是合理的。这个带有过滤器图案的地图实际上是在一个核心星火中使用的。见我对Sparks RDD.随机Split实际上是如何分割RDD的?和一个相关部分.的.randomSplit方法。如果唯一的目标是实现输入的分割,则可以使用partitionBy条款DataFrameWriter哪种文本输出格式:def makePairs(row: T): (String, String) = ??? data   .map(makePairs).toDF("key", "value")   .write.partitionBy($"key").format("text").save(...)*星火只有三种基本类型的转换:RDD[T]=>RDD[T]RDD[T]=>RDD[U](RDD[T],RDD[U])=>RDD[W]其中T,U,W可以是原子类型,也可以是原子类型产品/元组(K,V)。任何其他操作都必须使用上述的某种组合来表示。你可以检查原始RDD论文更多细节。** http:/chat.stackoverflow*另见ScalaSPark:将集合拆分成几个RDD?

白猪掌柜的

一种方法是使用自定义分区程序根据筛选条件对数据进行分区。这可以通过扩展Partitioner并实现类似于RangePartitioner.然后,可以使用映射分区从分区RDD构造多个RDD,而无需读取所有数据。val filtered = partitioned.mapPartitions { iter => {   new Iterator[Int](){     override def hasNext: Boolean = {       if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {         false       } else {         iter.hasNext      }     }     override def next():Int = iter.next()   }请注意,筛选的RDD中的分区数将与分区RDD中的分区数相同,因此应该使用合并来减少这一点,并删除空分区。
随时随地看视频慕课网APP
我要回答