本文内容以NetworkWordCount代码为例,代码如下
objectNetworkWordCount { defmain(args:Array[String]) { if objectNetworkWordCount { defmain(args:Array[String]) { if (args.length< 2) { System.err.println("Usage: NetworkWordCount<hostname> <port>") System.exit(1) } val sparkConf= newSparkConf().setAppName("NetworkWordCount").setMaster("local[2]") val ssc = newStreamingContext(sparkConf,Seconds(1)) val lines= ssc.socketTextStream(args(0), args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER) val words= lines.flatMap(_.split("")) val wordCounts= words.map(x => (x,1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
从ssc.socketTextStream开始,我们一步一步研究Dstream的依赖关系。ssc.socketTextStream代码如下
def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) }
第一步调用socketStream方法生成SocketInputDStream,代码如下
def socketStream[T: ClassTag]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel ): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) }
val lines= ssc.socketTextStream返回一个lines的Dstream
第二步执行flatMap操作,生成一个FlatMappedDStream
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) }
返回一个words Dstrem
第三步执行map操作,返回一个MappedDStream
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope { new MappedDStream(this, context.sparkContext.clean(mapFunc)) }
然后执行reduceByKey操作
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope { reduceByKey(reduceFunc, defaultPartitioner()) }
接着还是调用另一个reduceByKey
def reduceByKey( reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = ssc.withScope { combineByKey((v: V) => v, reduceFunc, reduceFunc, partitioner) }
然后调用combineByKey
def combineByKey[C: ClassTag]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope { val cleanedCreateCombiner = sparkContext.clean(createCombiner) val cleanedMergeValue = sparkContext.clean(mergeValue) val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)new ShuffledDStream[K, V, C]( self, cleanedCreateCombiner, cleanedMergeValue, cleanedMergeCombiner, partitioner, mapSideCombine) }
返回的是一个ShuffledDStream。以上所有操作形成了一个Dstream的依赖关系,Dstream的依赖关系其实就是RDD依赖关系的模板,他Spark core中的RDD操作保持高度的一致性,下面是一张运行时的依赖关系图
看到这里依赖关系已经清楚了,可是RDD还没有出现。接着看wordCounts.print(),从println方法进入,一路跟踪到new ForEachDStream,ForEachDStream中重新了generateJob方法,那generateJob的调用是在JobGenerator中的graph.generateJobs(time),具体调用参考第六编 job的动态生成。generateJob代码如下
override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
看parent.getOrCompute(time)方法是怎样返回RDD的
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {// If RDD was already generated, then retrieve it from HashMap,// or else compute the RDDgeneratedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
首先从generatedRDDs中获取RDD,generatedRDDs代码如下
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
generatedRDD就是一个以时间为key,RDD为value的HashMap。这里的Time和应用程序的batchDuration对齐,而RDD就是每一个job最后的一个RDD,因为RDD有依赖关系,所以保存最后一个RDD就可以回溯到所有的RDD。这里调用compute(time)方法,compute(time)有子类去实现。
回到最开始的ssc.socketTextStream,SocketInputDStream被实例化,SocketInputDStream断承自ReceiverInputDStream,看ReceiverInputDStream的computer方法
override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
if内代码是为了容错,看else中的代码,关键的一行代码 createBlockRDD(validTime, blockInfos),返回了一个blockRDD,blockInfos是通过ReceiverTracker获取到receiver接收数据的元数据信息,看一下createBlockRDD的代码
private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { if (blockInfos.nonEmpty) { val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Are WAL record handles present with all the blocks val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } if (areWALRecordHandlesPresent) { // If all the blocks have WAL record handle, then create a WALBackedBlockRDD val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) } else { // Else, create a BlockRDD. However, if there are some blocks with WAL info but not // others then that is unexpected and log a warning accordingly. if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logError("Some blocks do not have Write Ahead Log information; " + "this is unexpected and data may not be recoverable after driver failures") } else { logWarning("Some blocks have Write Ahead Log information; this is unexpected") } } val validBlockIds = blockIds.filter { id => ssc.sparkContext.env.blockManager.master.contains(id) } if (validBlockIds.size != blockIds.size) { logWarning("Some blocks could not be recovered as they were not found in memory. " + "To prevent such data loss, enabled Write Ahead Log (see programming guide " + "for more details.") } new BlockRDD[T](ssc.sc, validBlockIds) } } else { // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD // according to the configuration if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, Array.empty, Array.empty, Array.empty) } else { new BlockRDD[T](ssc.sc, Array.empty) } } }
首先判断blockInfos是否为空,有如没有元数据就生成的BlockRDD传递了一个空的Array,代表了blockIds(相当于数据块指针)为空。
如果有数据先判断WAL,WAL以后再看,直接看new BlockRDD[T](ssc.sc, validBlockIds),BlockRDD的代码如下
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition { val index = idx } private[spark] class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId]) extends RDD[T](sc, Nil) { @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) @volatile private var _isValid = true override def getPartitions: Array[Partition] = { assertValid() (0 until blockIds.length).map(i => { new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] }).toArray } override def compute(split: Partition, context: TaskContext): Iterator[T] = { assertValid() val blockManager = SparkEnv.get.blockManager val blockId = split.asInstanceOf[BlockRDDPartition].blockId blockManager.get(blockId) match { case Some(block) => block.data.asInstanceOf[Iterator[T]] case None => throw new Exception("Could not compute split, block " + blockId + " not found") } } override def getPreferredLocations(split: Partition): Seq[String] = { assertValid() _locations(split.asInstanceOf[BlockRDDPartition].blockId) } /** * Remove the data blocks that this BlockRDD is made from. NOTE: This is an * irreversible operation, as the data in the blocks cannot be recovered back * once removed. Use it with caution. */ private[spark] def removeBlocks() { blockIds.foreach { blockId => sparkContext.env.blockManager.master.removeBlock(blockId) } _isValid = false } /** * Whether this BlockRDD is actually usable. This will be false if the data blocks have been * removed using `this.removeBlocks`. */ private[spark] def isValid: Boolean = { _isValid } /** Check if this BlockRDD is valid. If not valid, exception is thrown. */ private[spark] def assertValid() { if (!isValid) { throw new SparkException( "Attempted to use %s after its blocks have been removed!".format(toString)) } } protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = { _locations } }ocations } }
这里的compute返回了一个迭代器,和从hdfs上读取数据是一样的,都是将数据块索引封装到迭代器中,等到触发action操作时被调用。
再次看一下这两行代码的操作
val words= lines.flatMap(_.split("")) val wordCounts= words.map(x => (x,1)).reduceByKey(_ + _)
首先看FlatMappedDStream的compute方法,
override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) }
从父Dstream中获取到RDD(就是blockRDD),然后执行flatMap操作返回Option[RDD[U]]。
然后看MappedDStream的computer方法
override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) }
也是从父Dstream中获取的RDD然后执行map操作,返回 Option[RDD[U]]。
ShuffledRDD的操作也是这种方式。
从Dstream的依赖关系上看,就是RDD的依赖关系。所以说Dstream就是RDD的模板。看一下打印出来的RDD依赖关系
(2) ShuffledRDD[4] at reduceByKey at NetworkWordCount.scala:25 [] +-(0) MapPartitionsRDD[3] at map at NetworkWordCount.scala:25 [] | MapPartitionsRDD[2] at flatMap at NetworkWordCount.scala:24 [] | BlockRDD[1] at socketTextStream at NetworkWordCount.scala:21 []
至于job的提交是调度器完成的,所以RDD的action触发就是job动态生成中讲的Job中的def run() { _result = Try(func())}的执行。
作者:海纳百川_spark
链接:https://www.jianshu.com/p/0e7e540de15c