RDD介绍
RDD概念
一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集来创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和groupBy)而创建得到新的RDD。
RDD提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型,前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系。两类操作的主要区别是,转换操作(比如map、filter、join等)接受RDD并返回RDD,而行动操作(比如count、collect等)接受RDD但是返回非RDD(即输出一个值或结果)。
RDD采用了惰性调用,即在RDD的执行过程中,真正的计算发生在RDD的“行动”操作,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。
1.jpg
如下图从输入中逻辑上生成A和C两个RDD,经过一系列“转换”操作,逻辑上生成了F(也是一个RDD),之所以说是逻辑上,是因为这时候计算并没有发生,Spark只是记录了RDD之间的生成和依赖关系。当F要进行输出时,也就是当F进行“行动”操作的时候,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算。
[图片上传中...(dependency.jpg-68480a-1526525698274-0)]
2.jpg
eg:在pyspark的交互环境下,输入如下代码
fileRDD = sc.textFile('/test.txt')def contains(line):... return 'hello world' in line filterRDD = fileRDD.filter(contains) filterRDD.cache() filterRDD.count()
第一行: 从HDFS文件中读取数据创建RDD
第二、三行: 定义一个过滤函数
第四行:对fileRDD进行转换操作得到一个新的RDD,即filterRDD
第五行:第5行代码表示对filterRDD进行持久化,把它保存在内存或磁盘中(这里采用cache接口把数据集保存在内存中),方便后续重复使用,当数据被反复访问时(比如查询一些热点数据,或者运行迭代算法)
第六行:count()是一个行动类型的操作,触发真正的计算,开始实际执行从fileRDD到filterRDD的转换操作,并把结果持久化到内存中,最后计算出filterRDD中包含的元素个数。
RDD的特性
(1)高效的容错性。现有的分布式共享内存、键值存储、内存数据库等,为了实现容错,必须在集群节点之间进行数据复制或者记录日志,也就是在节点之间会发生大量的数据传输,这对于数据密集型应用而言会带来很大的开销。在RDD的设计中,数据只读,不可修改,如果需要修改数据,必须从父RDD转换到子RDD,由此在不同RDD之间建立了血缘关系。所以,RDD是一种天生具有容错机制的特殊集合,不需要通过数据冗余的方式(比如检查点)实现容错,而只需通过RDD父子依赖(血缘)关系重新计算得到丢失的分区来实现容错,无需回滚整个系统,这样就避免了数据复制的高开销,而且重算过程可以在不同节点之间并行进行,实现了高效的容错。此外,RDD提供的转换操作都是一些粗粒度的操作(比如map、filter和join),RDD依赖关系只需要记录这种粗粒度的转换操作,而不需要记录具体的数据和各种细粒度操作的日志(比如对哪个数据项进行了修改),这就大大降低了数据密集型应用中的容错开销;
(2)中间结果持久化到内存。数据在内存中的多个RDD操作之间进行传递,不需要“落地”到磁盘上,避免了不必要的读写磁盘开销;
(3)存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化开销。
RDD的依赖关系
RDD中不同的操作会使得不同RDD中的分区会产生不同的依赖。RDD中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency)
窄依赖表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区;窄依赖典型的操作包括map、filter、union等
宽依赖则表现为存在一个父RDD的一个分区对应一个子RDD的多个分区。宽依赖典型的操作包括groupByKey、sortByKey等
dependency.jpg
对于连接(join)操作,可以分为两种情况。
(1)对输入进行协同划分,属于窄依赖(如图9-10(a)所示)。所谓协同划分(co-partitioned)是指多个父RDD的某一分区的所有“键(key)”,落在子RDD的同一个分区内,不会产生同一个父RDD的某一分区,落在子RDD的两个分区的情况。
(2)对输入做非协同划分,属于宽依赖,如图9-10(b)所示。
对于窄依赖的RDD,可以以流水线的方式计算所有父分区,不会造成网络之间的数据混合。对于宽依赖的RDD,则通常伴随着Shuffle操作,即首先需要计算好所有父分区数据,然后在节点之间进行Shuffle。
Stage的划分
Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分阶段,具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算(具体的阶段划分算法请参见AMP实验室发表的论文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》)。例如,如图9-11所示,假设从HDFS中读入数据生成3个不同的RDD(即A、C和E),通过一系列转换操作后再将计算结果保存回HDFS。对DAG进行解析时,在依赖图中进行反向解析,由于从RDD A到RDD B的转换以及从RDD B和F到RDD G的转换,都属于宽依赖,因此,在宽依赖处断开后可以得到三个阶段,即阶段1、阶段2和阶段3。可以看出,在阶段2中,从map到union都是窄依赖,这两步操作可以形成一个流水线操作,比如,分区7通过map操作生成的分区9,可以不用等待分区8到分区9这个转换操作的计算结束,而是继续进行union操作,转换得到分区13,这样流水线执行大大提高了计算的效率。
4.jpg
一个DAG图划分成多个“阶段”以后,每个阶段都代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集合。每个任务集合会被提交给任务调度器(TaskScheduler)进行处理,由任务调度器将任务分发给Executor运行。
RDD的运行流程
5.jpg
(1)创建RDD对象;
(2)SparkContext负责计算RDD之间的依赖关系,构建DAG;
(3)DAGScheduler负责把DAG图分解成多个阶段,每个阶段中包含了多个任务,每个任务会被任务调度器分发给各个工作节点(Worker Node)上的Executor去执行。
RDD编程
准备工作
开启hdfs 开启spark
创建RDD
(1)从文件系统中加载数据创建rdd
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
如果不加file://,那么默认是从hdfs上取文件,以下三条命令等价
lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") lines = sc.textFile("/user/hadoop/word.txt") lines = sc.textFile("word.txt")
注意
如果使用了本地文件系统的路径,那么,必须要保证在所有的worker节点上,也都能够采用相同的路径访问到该文件,比如,可以把该文件拷贝到每个worker节点上,或者也可以使用网络挂载共享文件系统。
textFile()方法的输入参数,可以是文件名,也可以是目录,也可以是压缩文件等。比如,textFile(“/my/directory”), textFile(“/my/directory/.txt”), and textFile(“/my/directory/.gz”).
textFile()方法也可以接受第2个输入参数(可选),用来指定分区的数目。默认情况下,Spark会为HDFS的每个block创建一个分区(HDFS中每个block默认是128MB)。你也可以提供一个比block数量更大的值作为分区数目,但是,你不能提供一个小于block数量的值作为分区数目。
(2)通过并行集合(数组)创建RDD
nums = [1,2,3,4,5] rdd = sc.parallelize(nums)
RDD操作
转换操作:
对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
下面列出一些常见的转换操作(Transformation API):
filter(func):筛选出满足函数func的元素,并返回一个新的数据集
map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
行动操作:
行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
下面列出一些常见的行动操作(Action API):
count() 返回数据集中的元素个数
collect() 以数组的形式返回数据集中的所有元素
first() 返回数据集中的第一个元素
take(n) 以数组的形式返回数据集中的前n个元素
reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每个元素传递到函数func中运行
惰性计算:
eg:统计单词总数
sc = SparkContext("local",'test') line = sc.textFile("/home/tobin/Documents/hdfsLocalStore/harryport/1.txt") //map操作会遍历每行文本 linelength = line.map(lambda s:len(s))#知道最后一步才开始执行,之前并没有执行,惰性计算totallength = linelength.reduce(lambda a,b:a+b) print(totallength)
持久化
from pyspark import SparkContext sc = SparkContext("local",'test')list =["hadoop","spark","Hive"] rdd = sc.parallelize(list)print(rdd.count())print(','.join(rdd.collect()))
有两个行动算子,以为着每次调用行动操作,都会出发一次从头开始的计算,为了避免重复计算,使用cache()将中间变量持久化到内存中,在上述代码的中间加入如下代码
rdd.cache()
分区
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。
对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。
如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)
打印元素
rdd.foreach(print)orrdd.map(print)or//集群模式下把各节点的数据打印出来rdd.collect().foreach(print)
键值对RDD
(1)用map()函数创建键值对rdd
from pyspark import SparkContext sc =SparkContext('local','test') lines=sc.textFile("/home/tobin/Documents/hdfsLocalStore/harryport/1.txt") pairrdd = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)) pairrdd.foreach(print)
(2)通过并行集合(列表)创建rdd
from pyspark import SparkContext sc =SparkContext('local','test') list =['hadoop','spark','hive','zookeeper'] rdd = sc.parallelize(list) pairrdd = rdd.map(lambda word:(word,1)) pairrdd.foreach(print
键值对转换操作
reduceByKey(func):使用func函数合并具有相同键的值
groupByKey() ;对具有相同键的值进行分组
keys(): 把键值对RDD中的key返回形成一个新的RDD
values():键值对RDD中的value返回形成一个新的RDD
sortByKey():sortByKey()的功能是返回一个根据键排序的RDD。
mapValues(func):只想对键值对RDD的value部分进行处理
join:类似于数据库的表连接
eg: 求相同的字符串出现的平均值
from pyspark import SparkContext sc =SparkContext('local','test') rdd =sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)]) rdd =rdd.mapValues(lambda x:(x,1)) rdd=rdd.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) rdd =rdd.mapValues(lambda x:x[0]//x[1])rdd.collect() rdd.foreach(print)
共享变量
有时候,需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)
广播变量
广播变量用来把变量在所有节点的内存之间进行共享。可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值,具体代码如下
broadcastVar = sc.broadcast([1, 2, 3]) broadcastVar.value
累加器
累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器(counter)和求和(sum)。Spark原生地支持数值型(numeric)的累加器,一个数值型的累加器,可以通过调用SparkContext.accumulator()来创建
accum = sc.accumulator(0) sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x)) accum.value
作者:dpengwang
链接:https://www.jianshu.com/p/a6afd64feb4d