DAG的生成
概述
spark作为一套高效的分布式运算框架,但是想要更深入的学习它,就要通过分析spark的源码,不但可以更好的帮助理解spark的工作过程,还可以提高对集群的排错能力,本文主要关注的是Spark的Stage任务的执行流程的流程。
DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
窄依赖 指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
宽依赖 指的是多个子RDD的Partition会依赖同一个父RDD的Partition
DAGScheduler调度队列
当我们看完Executor的创建与启动流程后,我们继续在SparkContext的构造方法中继续查看
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { 。。。。。。 private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { //通过SparkEnv来创建createDriverEnv SparkEnv.createDriverEnv(conf, isLocal, listenerBus) } //在这里调用了createSparkEnv,返回一个SparkEnv对象,这个对象里面有很多重要属性,最重要的ActorSystem private[spark] val env = createSparkEnv(conf, isLocal, listenerBus) SparkEnv.set(env) //创建taskScheduler // Create and start the scheduler private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) //创建DAGScheduler dagScheduler = new DAGScheduler(this) //启动TaksScheduler taskScheduler.start() 。。。。。 }
在构造方法中还创建了一个DAGScheduler对象,这个类的任务就是用来划分Stage任务的,构造方法中初始化了 private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
DAGSchedulerEventProcessLoop是一个事件总线对象,用来负责任务的分发,在构造方法eventProcessLoop.start()
被调用,该方法是父类EventLoop的start
def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() }
调用了eventThread的start方法,开启了一个线程
private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } }
run方法中不断的从LinkedBlockingDeque阻塞队列中取消息,然后调用onReceive(event)
方法,该方法是由子类DAGSchedulerEventProcessLoop实现的
override def onReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => //调用dagScheduler来出来提交任务 dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId, fetchFailed = false) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => dagScheduler.handleTaskSetFailed(taskSet, reason) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() }
onReceive中会匹配到传入的任务类型,执行相应的逻辑。到此DAGScheduler的调度队列会一直挂起,不断轮询队列中的任务。
DAG提交Task任务流程
当RDD经过一系列的转换Transformation方法后,最终要执行Action动作方法,这里比如WordCount程序中最后调用collect()
方法时会将数据提交到Master上运行,任务真正的被执行,这里的方法执行过程如下
/** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
sc
是SparkContext对象,这里调用 一个runJob
该方法调用多次重载的方法后,该方法最终会调用 dagScheduler.runJob
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { if (stopped) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } //dagScheduler出现了,可以切分stage dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
dagScheduler的runJob
是我们比较关心的
def runJob[T, U: ClassTag]( 。。。。。 val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties) waiter.awaitResult() match { case JobSucceeded => { logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) } case JobFailed(exception: Exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) throw exception } }
这里面的我们主要看的是submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
提交任务
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, allowLocal: Boolean, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { 。。。。。。 //把job加入到任务队列里面 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)) waiter }
这里比较关键的地方是eventProcessLoop.post
往任务队列中加入一个JobSubmitted类型的任务,eventProcessLoop是在构造方法中就初始化好的事件总线对象,内部有一个线程不断的轮询队列里的任务
轮询到任务后调用onReceive
方法匹配任务类型,在这里我们提交的任务是JobSubmitted类型
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => //调用dagScheduler来出来提交任务 dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)
调用了handleJobSubmitted
方法,接下来查看该方法
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean, callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: Stage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. //最终的stage finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } 。。。。 submitStage(finalStage) }
上面的代码中,调用了newStage
进行任务的划分,该方法是划分任务的核心方法,划分任务的根据最后一个依赖关系作为开始,通过递归,将每个宽依赖做为切分Stage的依据,切分Stage的过程是流程中的一环,但在这里不详细阐述,当任务切分完毕后,代码继续执行来到submitStage(finalStage)
这里开始进行任务提交
这里以递归的方式进行任务的提交
//递归的方式提交stage private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing == Nil) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") //提交任务 submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id) } }
调用submitMissingTasks(stage, jobId.get)
提交任务,将每一个Stage和jobId传入
private def submitMissingTasks(stage: Stage, jobId: Int) { 。。。。。 if (tasks.size > 0) { logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") stage.pendingTasks ++= tasks logDebug("New pending tasks: " + stage.pendingTasks) //taskScheduler提交task taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) } }
这里的代码我们需要关注的是 taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
创建了一个TaskSet对象,将所有任务的信息封装,包括task任务列表,stageId,任务id,分区数参数等
Task任务调度
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { //创建TaskSetManager保存了taskSet任务列表 val manager = createTaskSetManager(taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager //将任务加入调度池 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) } hasReceivedTask = true } //接受任务 backend.reviveOffers() }
该方法比较重要,主要将任务加入调度池,最后调用了backend.reviveOffers()
这里的backend是CoarseGrainedSchedulerBackend一个Executor任务调度对象
override def reviveOffers() { //自己给自己发消息 driverActor ! ReviveOffers }
这里用了内部的DriverActor对象发送了一个内部消息给自己,接下来查看receiver方法接受的消息
case ReviveOffers => makeOffers()
收到消息后调用了makeOffers()
方法
def makeOffers() { launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq)) }
makeOffers方法中,将Executor的信息集合与调度池中的Tasks封装成WokerOffers列表传给了launchTasks
def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { 。。。。。。 //把task序列化 val serializedTask = ser.serialize(task) 。。。。。 val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK //把序列化好的task发送给Executor executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) } } }
launchTasks方法将遍历Tasks集合,每个Task任务序列化,发送启动Task执行消息的给Executor
Executor的onReceive方法
//DriverActor发送给Executor的启动Task的消息 case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { val ser = env.closureSerializer.newInstance() //把Task反序列化 val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) //启动task executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }
Executor收到DriverActor发送的启动Task的消息,这里才开始真正执行任务了,将收到的Task序列化信息反序列化,调用Executor
的launchTask
方法执行任务
def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) { //把task的描述信息放到了一份TaskRunner val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) runningTasks.put(taskId, tr) //然后把TaskRunner丢到线程池里面 threadPool.execute(tr) }
launchTask内将Task提交到线程池去运行,TaskRunner是Runnable对象,里面的run方法执行了我们app生成的每一个RDD的链上的逻辑。 到此,RDD的整个作业方式就结束了。
作者:那年的坏人
链接:https://www.jianshu.com/p/a95454e35022