combineByKey():
(createCombiner, mergeValue, mergeCombiners, partitioner)
最常用的基于key的聚合函数,返回的类型可以与输入类型不一样。
许多基于key的聚合函数都用到了它,像groupByKey()
遍历partition中的元素,元素的key,要么之前见过的,要么不是。
如果是新元素,使用我们提供的createCombiner()函数
如果是这个partition中已经存在的key,就会使用mergeValue()函数
合并每个partition的结果的时候,使用mergeCombiners()函数
combineByKey():
(createCombiner, mergeValue, mergeCombiners, partitioner)
最常用的基于key的聚合函数,返回的类型可以与输入类型不一样。
许多基于key的聚合函数都用到了它,像groupByKey()
遍历partition中的元素,元素的key,要么之前见过的,要么不是。
如果是新元素,使用我们提供的createCombiner()函数
如果是这个partition中已经存在的key,就会使用mergeValue()函数
合并每个partition的结果的时候,使用mergeCombiners()函数
KeyValue对RDDs(4)
combineByKeys() example+
KeyValue对RDDs(4)
combineByKeys() example
KeyValue对RDDs(4)
combineByKey()
KeyValue对RDDs(4)
KeyValue对RDDs(4)
CombineByKey
combineBykey一次没看懂,过会儿学完Scala 和 函数式变成再过来学习一遍。。。。。
RDDs combineByKey()
combineByKey的运用
通过前面计算的分数总和,算出平均数
这就是通过combinebykey求各科目考试分数的总和,有些抽象,需要多多理解!很重要,如下图所示。
手动创建rdds【scores分数】
打印结果如下图所示,接下来再使用bombinebykey求平均值。。。
keyvalue对rdds的combinebykey():
遍历partition中的元素,元素的key,要么之前见过的,要么不是。
如果是新元素,使用我们提供的createcombiner()函数;
如果是这个partition中已经存在的key,就会使用mergevalue()函数;
合计每个partition的结果的时候,使用mergecombiners()函数。
keyvalue对rdds的combinebykey():
(createcombiner,mergevalue,mergecombiners,partitioner)
最常用的基于key的聚合函数,返回的类型可以与输入类型不一样
许多基于key的聚合函数都用到了它,像groupbykey()
实例:
计算品均值:
val scores=sc.parallelize(("jake",80.0),("jake",90.0),("jake",85.0),("mike",85.0),("mike",92.0),("mike",90.0))
scores.foreach(println)
val score2=scores.combineByKey(x=>(1,x),(c1:(Int,Double),newScore)=>(c1._1+1,c1.2+newScore),(c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))
其中x代表分数
score2.foreach(println)
val average=score2.map{case(name,(num,score))=>(name,score/num)}
average.foreach(println)
CombineByKey 最常用的聚合函数