手记

Spark之abort stage

NOTE:本文要求读者对spark的运行原理有基本的了解。

需要明确的一点是,abort不同于fail。如果一个stage fail了,那么它还有可能被resubmit,然后重新执行。而如果一个stage abort了,则将无法再次执行。

首先,为什么要abort一个stage?
显然,如果spark认为即使执行该stage,也会以失败告终,那么就没必要继续执行了。此时,就会abort该stage。

其次,什么时候abort一个stage?
主要有两种情况:一、在stage提交tasks过程中,发生了错误;二、 该stage多次执行失败。
具体的有:

  1. 如果没有一个active的job需要该stage,则abort:

abortStage(stage, "No active job for stage " + stage.id, None)
  1. 在该stage的submitMissingTasks过程中:

a) 如果tasks创建失败,则abort:

 abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))

b) 如果tasks没有序列化,则abort:

abortStage(stage, "Task not serializable: " + e.toString, Some(e))

c) 如果tasks序列化失败,则abort:

abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
  1. 如果该stage中的某个task的失败次数超过阈值,则abort该task所在的TaskSetManager,进而导致该stage abort:

if (numFailures(index) >= maxTaskFailures) {
  logError("Task %d in stage %s failed %d times; aborting job".format(
  index, taskSet.id, maxTaskFailures))  abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
  .format(index, taskSet.id, maxTaskFailures, failureReason), failureException)   return}
  1. 如果一个stage因FetchFailed连续失败次数超过阈值,则abort:

val shouldAbortStage = failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttemptsif (shouldAbortStage) {
     s"""$failedStage (${failedStage.name})
     |has failed the maximum allowable number of
     |times: $maxConsecutiveStageAttempts.
     |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ")
 }
 abortStage(failedStage, abortMessage, None)

What's more:
当然,还有很多TaskSchedulerImpl在处理tasks过程中产生的异常,引起的TaskSetManager的abort,进而导
致stage的abort的情况。在此不再赘述。个人觉得,主要的还是关注stage因多次执行失败,而造成的abort比较重要。

最后,abort一个stage会有什么影响?
显然,一个stage abort,意味着那些依赖于该stage的active jobs都会执行失败。所以,我们需要fail这些jobs。

而如果我们要fail一个job,那么,也会fail掉(并标记为结束)该job中的所有stages。但是,事实上,并不是一个job中的所有stages都会被fail。这里有个条件:如果该stage只被这一个job所依赖,才能fail掉该stage。因为如果有多个jobs依赖一个stage,而如果我们fail掉了该stage,就会导致其他需要使用该stage的jobs因为该stage的fail而失败。

这里有个问题,如果需要被abort的stage被多个jobs所依赖,那么根据上述的条件,该stage最终就不能真正的abort了吗?

其实不是这样的。
假设我们有3个jobs(jobA,jobB, jobC)依赖了该需要被abort的stage0。当jobA尝试fail掉它所有依赖的stages时,发现stage0同时被其它2个jobs所依赖,于是放弃fail该stage,转而检查其它的stages。

虽然,最终,jobA没有fail掉所有依赖的stages,比如stage0。但它依然会fail掉自己。当jobA fail的时候,就会清理相关的数据结构。比如,依赖stage0的jobs就只剩下jobB和jobC了。

等到jobB fail它所依赖的stages的时候,发现stage0同时被另一个job所依赖,于是也放弃fail stage0。当jobB fail时,清理完相关数据结构。此时,依赖stage0的job就只剩下jobC了。

那么,等到jobC fail它所依赖的stages的时候,发现stage0此时只有一个job依赖,那就是jobC自己。既然我jobC就要fail了,那么留着stage0显然也没有什么用了。如果该stage0正在running,显然是一种资源的浪费。所以这种情况下,还要kill掉其中正在运行的tasks。最后,jobC才fail掉自己,并再次清理相关数据结构。

最终,stage0也就被fail掉了。



作者:找不到工作的_Ngone
链接:https://www.jianshu.com/p/180b21b3e0a0


0人推荐
随时随地看视频
慕课网APP