clipboard.png
在Yarn上运行的container包含两类,一类是ApplicationMaster,这是每个yarn任务启动的第一个Container;另一类是运行用户任务的Container。
ApplicationMaster进程启动
1.Yarn Client 向 Yarn 中提交应用程序。(对应上图中1)
客户端向Yarn提交任务时需要调用org.apache.hadoop.yarn.client.api.YarnClient的两个api方法:
public abstract YarnClientApplication createApplication() throws YarnException, IOException;public abstract ApplicationId submitApplication( ApplicationSubmissionContext appContext) throws YarnException, IOException;
createApplication向通过RPC与ResourceManager进程通信(rmClient.getNewApplication(request)),让其分配一个新的AppLication,结果存在GetNewApplicationResponse实体中,其中包括ApplicationId、集群最大可分配资源。createApplication的结果存在YarnClientApplication实体中。
客户端获取到YarnClientApplication后需要设置其中的上下文对象中的信息org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext,包括aplicationName、资源、队列、优先级、ApplicationMaster启动命令(在ContainerLaunchContext实体中,普通Container启动也使用这个实体),最后调用上面提到的第二个方法submitApplication,将ApplicationSubmissionContext实体传到ResourceManger端(rmClient.submitApplication(request);)。
2.ResourceManager 收到请求后,在集群中选择一个 NodeManager,并为该应用程序分配一个 Container,在这个 Container 中启动应用程序的 ApplicationMaster。 (对应图中2)
ResourceManager端对应处理代码:
1)org.apache.hadoop.yarn.server.resourcemanager.ClientRMService类处理对RM的所有RPC请求,提交任务对应处理的方法为submitApplication。
2)org.apache.hadoop.yarn.server.resourcemanager.RMAppManager#submitApplication方法,首先会实例化RMAppImpl,在实例化RMAppImpl时会初始化状态机(RMAppImpl类155行,见参考 2)
最后调用
this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.START));
该处使用异步调度器和状态机模式,状态机的注册在ResouceManger#serviceInit方法中:
rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext));
3)ApplicationMaster启动:
上面提到到了RMApp状态机,在RM中,一个RMApp对应一到多个RMappAttempt,即假如RMApp的第一个RMAppAttempt失败后,RM会根据配置启动新的RMAppAttempt,RMAppAttempt的状态机见参考3。
RMApp启动RMAppAttempt的源码:
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl#createAndStartNewAttempt(RMApp状态机流转到APP_ACCEPTED或ATTEMPT_FAILED时触发),触发RMAppAttempt状态机中的START状态。当流转到ALLOCATED状态时RM中的ApplicationMasterLauncher与对应的NodeManager通信,启动ApplicationMaster,此时Application Attempt将被置为LAUNCHED状态。(资源申请过程略)
代码参考
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.AttemptStoredTransition、
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl#launchAttempt、
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher
状态机关系:
// Transitions from ALLOCATED_SAVING State
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED,RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
// Transitions from ALLOCATED State
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
RM与NM通信真正启动AppMaster进程及启动后修改状态机为LAUNCHED:
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher#run
try {
LOG.info("Launching master" + application.getAppAttemptId());
launch();
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
} catch(Exception ie) {
launch()方法是真正启动APPMaster的地方,关键代码:
StartContainersResponse response =
containerMgrProxy.startContainers(allRequests);
此处containerMgrProxy是AppMaster和NM通信用来启动、停止、获取container状态的rpc协议,由于AppMaster是特殊的Container,且由RM发起启动,所以此处是RM与NM的通信过程。
3、AppMaster向RM申请资源并要求NM启动container(对应图中3、4)
AppMaster是一个yarn任务运行时第一个由RM启动的container,然后负责整个任务的运行,包括container的申请、启动、kill、状态检查等。ApplicationMaster属于应用程序级,其实现不是由Yarn框架提供(历史原因,yarn提供了MapReduce的ApplicationMaster的实现),需要用户自己实现ApplicationMaster进程的具体实现。以spakr的ApplicationMaster为例;
申请资源过程:
提交需求,通过心跳,把需求发送给 RM;
获取Container,通过心跳,拿到申请好的 Container;
每申请到一个 Container ,与 NM 通信,启动这个Container;
启动container过程:
调用过程:
org.apache.spark.deploy.yarn.ApplicationMaster#runExecutorLauncherorg.apache.spark.deploy.yarn.ApplicationMaster#registerAMorg.apache.spark.deploy.yarn.ExecutorRunnable#runorg.apache.spark.deploy.yarn.ExecutorRunnable#startContainertry { nmClient.startContainer(container.get, ctx) } catch { nmClient为NM的client类 org.apache.hadoop.yarn.client.api.impl.NMClientImpl#startContainerStartContainersResponse response = proxy .getContainerManagementProtocol().startContainers(allRequests);
此处通过ContainerManagementProtocol与NM通信,启动Container。
对应NM端的代码:
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl#startContainers
Container进程启动的主类为CoarseGrainedBackend。(在哪指定的类名?)
container启动状态如何通知到AM?
4、driver端向完成DAG划分和Task划分,向excutor发送任务。(对应图中5)
图中YarnClusterScheduler类是yarn集群cluster模式下的TaskScheduler,继承YarnScheduler,YarnScheduler又继承TaskSchedulerImpl类,YarnClusterScheduler和YarnScheduler几乎没有其他实现,主要逻辑都集中在TaskSchedulerImpl类中,TaskSchedulerImpl在SparkContext类中被实例化(参考spark源码“第五节 taskscheduler”)。
driver端:
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks
excutor端注册、接受driver派发的任务:
org.apache.spark.executor.CoarseGrainedExecutorBackend#receive处理的消息类型包括:
RegisteredExecutor(成功向driver注册)、
RegisterExecutorFailed(向driver注册失败)、
LaunchTask(driver向excutor派发的任务)、
KillTask(杀死任务)、
StopExecutor(停止executor,转调shutdown)、
Shutdown(停止executor)
如果消息是“RegisteredExecutor”表示已经成功向driver注册,此时创建Excutor类
override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") try { executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) }
如果消息是“LaunchTask”则代表是driver派发过来的任务,此时调用executor的launchTask方法。
case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc) }
每个task的运行实际运行于线程池中的一个线程中,如下代码:
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { val tr = new TaskRunner(context, taskDescription) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) }
作者:JX907
链接:https://www.jianshu.com/p/4d753ba9a8b2