一、主要以worker启动Driver和executor为例进行Worker原理的分析
二、Driver的启动详细步骤详解
1、worker在接收到Master的请求,请求启动Driver的时候,会调用方法LaunchDriver(driverId, driverDesc)来启动这个进程,参数表示的是Driver启动的标识以及所需要的基本信息,
- case LaunchDriver(driverId, driverDesc) 
2、在LaunchDriver内部会创建一个DriverRunner对象,这个对象主要是对Driver进行管理,比如像Master发送消息等等。
- val driver = new DriverRunner( 
- conf, 
- driverId, 
- workDir, 
- sparkHome, 
- driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), 
- self, 
- workerUri, 
- securityMgr) 
3、在线程内部会去创建工作目录下载我们的jar包
- //创建driver的工作目录 
- val driverDir = createWorkingDirectory() 
- //读取我们上传的jar文件 
- val localJarFilename = downloadUserJar(driverDir) 
4、封装Driver所需的信息,使用buildProcessBuilder启动driver
- //创建buildProcessBuilder 
- // TODO: If we add ability to submit multiple jars they should also be added here 
- val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, 
- driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) 
- //启动Driver 
- runDriver(builder, driverDir, driverDesc.supervise) 
5、启动完成之后会去向worker发送消息说明这个Driver的状态已经改变
- worker.send(DriverStateChanged(driverId, finalState.get, finalException)) 
6、整个流程的示意图:
        
三、executor的创建详解
1、在请求启动executor的时候会去调用launchExecutor
- LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) 
2、然后在创建一个executorRunner对象
- //创建一个ExecutorRunner对象来管理executor 
- val manager = new ExecutorRunner( 
- appId, 
- execId, 
- appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), 
- cores_, 
- memory_, 
- self, 
- workerId, 
- host, 
- webUi.boundPort, 
- publicAddress, 
- sparkHome, 
- executorDir, 
- workerUri, 
- conf, 
- appLocalDirs, ExecutorState.RUNNING) 
3、调用executorRunner的start()方法
- manager.start() 
- private[worker] def start() { 
- workerThread = new Thread("ExecutorRunner for " + fullId) { 
- override def run() { fetchAndRunExecutor() } 
- } 
4、调用fetchAndRunExecutor()创建ProcessBuilder
- private def fetchAndRunExecutor() { 
- try { 
- // Launch the process 
- val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf), 
- memory, sparkHome.getAbsolutePath, substituteVariables) 
5、调用start()启动executor
- process = builder.start() 
6、示意图:
            
四、总的执行流程