keyvalue对rdds:
创建keyvalue对rdds,使用map()函数,返回key/value对
例如,包含数行数据的rdd,把每行数据的第一个单词作为keys。
val rdd2 = rdd.map(line=>(line.split(" ")(0), line))
KeyValue对RDDs(3)
KeyValue对RDDs(2)
KeyValue对RDDs
keyvalue RDDs
keyvalue RDDs
课程小结
keyvalue对rdds的创建
keyvalue对rdds的操作(常见的)
常见的keyvalue对rdds,
mapvalues(func)
flatmapvalues(func)
keys()
values()
sortbykey()
创建keyvalue对rdds,如reducebykey可以将相同key的value值相加作为该key的value;
而groupbykey,按相同的key将values进行分组。
手动创建keyvalue对rdds,如rdd3,如下图所示:
keyvalue对rdds的transformations,手动处理。
打印的rdd2结果如下图所示。
加载文件
打印文件
创建keyvalue对。如rdd2.
keyvalue对rdds:
创建keyvalue对rdds,使用map()函数,返回key/value对
例如,包含数行数据的rdd,把每行数据的第一个单词作为keys。
KeyValue对RDDS 创建KeyValue对RDDS: 使用map()函数,返回key/value对 eg: 包含数行数据的RDD,把每一行数据的第一个单词作为keys; val rdd=sc.textFile("/home/1707498/YC_test/words.txt") rdd.foreach(println) var rdd2=rdd.map(line=>(line.spilt(" ")(0),line)) 每一行的第一个座位key,整行数据座位value KeyValue对Rdds的Transformation eg:val rdd=sc.parallelize(Array((1,2),(3,4),(3,6))) rdd.reduceByKey(func): 把相同的key结合 eg:rdd.reduceByKey((x,y)=>x+y) 结果:{1,2),(3,10)} rdd.groupByKeyByKey(func):把相同key的values分组 eg:rdd.groupByKeyByKey((x,y)=>x+y) 结果:{(1,[2]),(3,[4,6])} rdd.mapValues(func):函数作用于pairRDD的每个元素,key不变 eg:rdd.mapValues(x=>x+1) 结果:{(1,3),(3,5),(3,7)} rdd.flatMapValues(func):符号化的时候使用; eg:rdd.flatMapValues(x=>x to 5) 结果:{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} keys():仅返回keys values():仅返回values sortByKey():按照key排序的RDD *重要 combinByKey(createCombiner,mergeValue,mergeCombiners,partitioner):把相同的key结合,使用不同的返回类型 最常用的聚合函数,返回的类型可以与输入类型不一样,许多; 遍历partition中的元素,元素的key,要么之前见过的,要么不是(rdd中很多个分区组成); 如果是新元素key,使用我们提供的createCombiner()函数(相当于初始化) 如果是这个partition中已经存在的key,就会使用mergeValue()函数(相当于整合) 合计每个partition的结果的时候,使用mergeCombiners()函数 eg:求平均值 val scores=sc.parallelize(Array(("jake",80.0),("jake",90.0),("jake",90.0),("mary",89.0),("mary",40.0),("mary",90.0)) scores.foreach(println) val score2=scores.combinByKey(x=>(1,x),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2_1,c1._2+c2._2))) x=>(1,x) 想要求平均值,需要知道科目的总和,科目的个数,每遍历一个新key记1 c1:(Int,Double) int表示累计科目数,double累计分数 c1._1+1,c1._2+newScore c1._1取的第一个值,遇到新的key就加1 c1._2+newScore 分数累加 c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2_1,c1._2+c2._2) 每一个分区中汇总的科目数、分数汇总 val score2=scores.combinByKey(x=>(1,x),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2_1,c1._2+c2._2))) val average=score2.map{case(name,(num,score))=>(name,score/num)} case(name,(num,score)) 判断传递过来的类型是否正确 name,score/num) 正确的话便执行求均值
1.spark持久化的集中方式
KeyValue对RDDs常见操作2
KeyValue对RDDs常见操作1