猿问

函数编程中的reduce和foldLeft / fold之间的区别

为什么Scala和Spark和Scalding等框架同时具有reducefoldLeft?那么,reduce和之间有什么区别fold



大话西游666
浏览 1015回答 3
3回答

月关宝盒

foldApache Spark中的内容与fold未分发的集合中的内容不同。实际上,它需要交换函数才能产生确定性的结果:这与以Scala之类的功能语言为非分布式集合实现的折叠操作有些不同。该折叠操作可以单独应用于分区,然后将那些结果折叠为最终结果,而不是以某些定义的顺序将折叠应用于每个元素。对于非交换函数,结果可能与应用于非分布式集合的折叠结果不同。Mishael Rosenthal 已证明了这一点,Make42在其评论中建议了这一点。有人建议观察到的行为与HashPartitioner何时parallelize不洗牌和不使用有关HashPartitioner。import org.apache.spark.sql.SparkSession/* Note: standalone (non-local) mode */val master = "spark://...:7077"&nbsp;&nbsp;val spark = SparkSession.builder.master(master).getOrCreate()/* Note: deterministic order */val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })/* Note: all posible permutations */require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)解释:foldRDD的结构def fold(zeroValue: T)(op: (T, T) => T): T = withScope {&nbsp; var jobResult: T&nbsp; val cleanOp: (T, T) => T&nbsp; val foldPartition = Iterator[T] => T&nbsp; val mergeResult: (Int, T) => Unit&nbsp; sc.runJob(this, foldPartition, mergeResult)&nbsp; jobResult}与RDD的结构reduce相同:def reduce(f: (T, T) => T): T = withScope {&nbsp; val cleanF: (T, T) => T&nbsp; val reducePartition: Iterator[T] => Option[T]&nbsp; var jobResult: Option[T]&nbsp; val mergeResult =&nbsp; (Int, Option[T]) => Unit&nbsp; sc.runJob(this, reducePartition, mergeResult)&nbsp; jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}在runJob不考虑分区顺序的情况下执行,导致需要交换功能。foldPartition并且reducePartition在处理顺序上有效,reduceLeft并且foldLeft在上有效执行(通过继承和委派)TraversableOnce。结论:foldRDD不能依赖于块的顺序,而是需要可交换性和关联性。
随时随地看视频慕课网APP
我要回答