手记

1 Spark Streaming 透彻理解之一

本文内容基于Spark最新版1.6.1

  1. Spark 最初只有Spark Core,通过逐步的发展,现在已扩展出Spark SQL、Spark Streaming、Spark MLlib(machine learning)、GraphX(graph)、Spark R等。 而Spark Streaming本是Spark Core上的一个子框架,如果我们试着去精通这个子框架,不仅仅能写出非常复杂的应用程序,还能够很好的驾驭Spark,进而研究并达到精通Spark的地步,及其寻找到Spark问题的解决之道。

  2. 我们为什么从Spark Streaming切入研究Spark源码的定制,因为Spark SQL涉及到很多SQL语法解析和优化的细节,对于我们集中精力研究Spark有所干扰;Spark R还不是很成熟,支持功能有限;GraphX最近几个版本基本没有改进,里面有许多数学算法;MLlib也涉及到相当多的数学知识。

  3. Spark Streaming的优势是在于可以结合SparkSQL、图计算、机器学习,使其功能更加强大。同时在Spark中Spark Streaming也是最容易出现问题的,因为它是不断的运行,内部比较复杂。掌握好Spark Streaming,可以去窥视Spark的一切!

  4. Spark Streaming到底是什么? Spark Streaming是一个流式计算框架,运行在Spark Core之上。这是一个流处理的时代,一切数据如果不是以流式来处理或者跟流式的处理不相关的话,都将是次数据,我们必将处在一个流的数据处理时代。Spark Streaming很像是基于Spark Core之上的一个应用程序。不像其他子框架,比如机器学习是把数学算法直接应用在Spark的RDD之上,Spark Streaming更像一般的应用程序那样,感知流进来的数据并进行相应的处理。很像顺其自然的一种感知操作,利用自己独有的“神经元”来对数据进行各类操作。

  5. Spark Streaming的几大优点

  • 对源源不断流进来的数据,能够迅速响应并立即给出你所要是反馈信息

  • Spark非常强大的地方在于它的流式处理可以在线的利用机器学习、图计算、Spark SQL或者Spark R的成果,这得益于Spark多元化、一体化的基础架构设计。也就是说,在Spark技术堆栈中,Spark Streaming可以调用任何的API接口,不需要做任何的设置。这是Spark无可匹敌之处,也是Spark Streaming必将一统天下的根源。

  1. 如何清晰的看到数据的流入、被处理的过程? 使用一个小技巧,通过调节放大Batch Interval的方式,来降低批处理次数,以方便看清楚各个环节。
    我们从已写过的广告点击的在线黑名单过滤的Spark Streaming应用程序入手,看一下是具体的实验源码:

import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/**
 * 使用Scala开发集群运行的Spark 在线黑名单过滤程序
 *
 * 背景描述:在广告点击计费系统中,我们在线过滤掉黑名单的点击,
进而保护广告商的利益,只进行有效的广告点击计费
 *  或者在防刷评分(或者流量)系统,过滤掉无效的投票或者评分或者流量;
 * 实现技术:使用transform Api直接基于RDD编程,进行join操作
 *
 */object OnlineBlackListFilter {
  def main(args: Array[String]){    /**
     * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
     * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
     * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
     * 只有1G的内存)的初学者       *
     */
    val conf = new SparkConf() //创建SparkConf对象
    conf.setAppName("OnlineBlackListFilter") //设置应用程序的名称,在程序运行的监控界面可以看到名称
    conf.setMaster("spark://Master:7077") //此时,程序在Spark集群
    val ssc = new StreamingContext(conf, Seconds(30))    /**
     * 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中,黑名单的生成往往有复杂的业务
     * 逻辑,具体情况算法不同,但是在Spark Streaming进行处理的时候每次都能工访问完整的信息
     */
    val blackList = Array(("hadoop", true),("mahout", true))
    val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
    val adsClickStream = ssc.socketTextStream("Master", 9999)    /**
     * 此处模拟的广告点击的每条数据的格式为:time、name
     * 此处map操作的结果是name、(time,name)的格式
     */
    val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
    adsClickStreamFormatted.transform(userClickRDD => {      //通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,又获得了相应点击内容是否在黑名单中
      val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)      /**
       * 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean))
       * 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在在值
       * 如果存在的话,表面当前广告点击是黑名单,需要过滤掉,否则的话则是有效点击内容;
       */
      val validClicked = joinedBlackListRDD.filter(joinedItem => {        if(joinedItem._2._2.getOrElse(false))
        {          false
        } else {          true
        }
      })
      validClicked.map(validClick => {validClick._2._1})
    }).print    /**
     * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费
     */
    ssc.start()
    ssc.awaitTermination()
  }
}

集群中需要先执行nc,启动 9999端口

nc -lk 9999

将代码打包上传到集群运行

  1. 我们运行完程序,看到过滤结果以后,停止程序,打开HistoryServer http://master:18080/


  2. 点击App ID进去,打开,会看到如下图所示的4个Job,从实际执行的Job是1个Job,但是图中显示有4个Job,从这里可以看出Spark Streaming运行的时候自己会启动一些Job。



    先看看job id 为0 的详细信息


  3. 很明显是我们定义的blackListRDD数据的生成。对应的代码为

val blackList = Array((“Hadoop”, true), (“Mathou”, true)) 
//把Array变成RDD val blackListRDD = ssc.sparkContext.parallelize(blackList)

并且它做了reduceBykey的操作(代码中并没有此步操作,SparkStreaming框架自行生成的)。
这里有两个Stage,Stage 0和Stage 1

Job 1的详细信息



一个makeRDD,这个RDD是receiver不断的接收数据流中的数据,在时间间隔达到batchInterval后,将所有数据变成一个RDD。并且它的耗时也是最长的59s

  1. 此处可以看出,receiver也是一个独立的job。由此我们可以得出一个结论:我们在应用程序中,可以启动多个job,并且不用的job之间可以相互配合,这就为我们编写复杂的应用程序打下了基础。
    我们点击上面的start at OnlineBlackListFilter.scala:64查看详细信息

  2. 根据上图的信息,只有一个Executor在接收数据,最最重要的是红色框中的数据本地性为PROCESS_LOCAL,由此可以知道receiver接收到数据后会保存到内存中,只要内存充足是不会写到磁盘中的
    即便在创建receiver时,指定的存储默认策略为

MEMORY_AND_DISK_SER_2 
def socketTextStream( 
hostname: String, 
port: Int, 
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 
): ReceiverInputDStream[String] = withNamedScope(“socket text stream”) { 
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) 
}
  1. job 2的详细信息




    Job 2 将前两个job生成的RDD进行leftOuterJoin操作。
    从Stage Id的编号就可以看出,它是依赖于上两个Job的。
    Receiver接收数据时是在spark-master节点上,但是Job 2在处理数据时,数据已经到了spark-worker1上了(因为我的环境只有两个worker,数据并没有分散到所有worker节点,worker节点如果多一点,情况可能不一样,每个节点都会处理数据)
    点击上面的Stage Id 3查看详细信息:



    Executor上运行,并且有5个Task 。
    Job 3的详细信息


  2. 总结:我们可以看出,一个batchInterval并不是仅仅触发一个Job。
    根据上面的描述,我们更细致的了解了DStream和RDD的关系了。DStream就是一个个batchInterval时间内的RDD组成的。只不过DStream带上了时间维度,是一个无边界的集合。



    以上的连续4个图,分别对应以下4个段落的描述:
    Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各种来源的实时输入数据,进行处理后,处理结果保存在HDFS、Databases等各种地方。
    Spark Streaming接收这些实时输入数据流,会将它们按批次划分,然后交给Spark引擎处理,生成按照批次划分的结果流。
    Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream。DStream本质上表示RDD的序列。任何对DStream的操作都会转变为对底层RDD的操作。
    Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。
    在我们前面的实验中,每300秒会接收一批数据,基于这批数据会生成RDD,进而触发Job,执行处理。
    DStream是一个没有边界的集合,没有大小的限制。
    DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。
    锁定到时间片后,就是空间的操作,也就是对本时间片的对应批次的数据的处理。
    下面用实例来讲解数据处理过程。
    从Spark Streaming程序转换为Spark执行的作业的过程中,使用了DStreamGraph。
    Spark Streaming程序中一般会有若干个对DStream的操作。DStreamGraph就是由这些操作的依赖关系构成。

对DStream的操作会构建成DStream Graph



从每个foreach开始,都会进行回溯。从后往前回溯这些操作之间的依赖关系,也就形成了DStreamGraph。
在每到batchInterval时间间隔后,Job被触发,DStream Graph将会被转换成RDD Graph



空间维度确定之后,随着时间不断推进,会不断实例化RDD Graph,然后触发Job去执行处理。



作者:海纳百川_spark
链接:https://www.jianshu.com/p/8de6ec8513ca

0人推荐
随时随地看视频
慕课网APP