月关宝盒
foldApache Spark中的内容与fold未分发的集合中的内容不同。实际上,它需要交换函数才能产生确定性的结果:这与以Scala之类的功能语言为非分布式集合实现的折叠操作有些不同。该折叠操作可以单独应用于分区,然后将那些结果折叠为最终结果,而不是以某些定义的顺序将折叠应用于每个元素。对于非交换函数,结果可能与应用于非分布式集合的折叠结果不同。Mishael Rosenthal 已证明了这一点,Make42在其评论中建议了这一点。有人建议观察到的行为与HashPartitioner何时parallelize不洗牌和不使用有关HashPartitioner。import org.apache.spark.sql.SparkSession/* Note: standalone (non-local) mode */val master = "spark://...:7077" val spark = SparkSession.builder.master(master).getOrCreate()/* Note: deterministic order */val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })/* Note: all posible permutations */require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)解释:foldRDD的结构def fold(zeroValue: T)(op: (T, T) => T): T = withScope { var jobResult: T val cleanOp: (T, T) => T val foldPartition = Iterator[T] => T val mergeResult: (Int, T) => Unit sc.runJob(this, foldPartition, mergeResult) jobResult}与RDD的结构reduce相同:def reduce(f: (T, T) => T): T = withScope { val cleanF: (T, T) => T val reducePartition: Iterator[T] => Option[T] var jobResult: Option[T] val mergeResult = (Int, Option[T]) => Unit sc.runJob(this, reducePartition, mergeResult) jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}在runJob不考虑分区顺序的情况下执行,导致需要交换功能。foldPartition并且reducePartition在处理顺序上有效,reduceLeft并且foldLeft在上有效执行(通过继承和委派)TraversableOnce。结论:foldRDD不能依赖于块的顺序,而是需要可交换性和关联性。