首先希望大家健康平安。 对于互联网公司的大数据,一般都会需要一个数据开发平台。 incubator-dolphinscheduler再适合不过了。今天我也开始进行学习一下,然后记录一下学习的内容。 通过阅读源码从简单的地方入手学习,来看一下具体的实现逻辑,对后续的安装部署和使用中遇到的问题肯定 会有帮助。
1-代码下载和查看
mvn -U clean package -Prelease -Dmaven.test.skip=true mvn install
2-官网的定义
看源码之前先了解这些细节.会对源码中相关的概念有帮助 DAG: 全称Directed Acyclic Graph,简称DAG。工作流中的Task任务以有向无环图的形式组装起来, 从入度为零的节点进行拓扑遍历,直到无后继节点为止。 流程定义:通过拖拽任务节点并建立任务节点的关联所形成的可视化DAG 流程实例:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次, 产生一个流程实例 任务实例:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态 DolphinScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心, 并使用Zookeeper分布式锁来选举其中的一台Master或Worker为“管理者”来执行任务
dolphinscheduler-api
dolphinscheduler-api模块主要提供了和前端交互的接口。 为了了解整个系统的运行情况,我们只要先关心几个接口就可以。
1-ProcessDefinitionController.createProcessDefinition: 流程定义,保存我们页面定义的流程。 2-ExecutorController.startProcessInstance: 直接运行的任务,不用调度 产生一条Command到数据库中:processDao.createCommand(command); 具体代码就不详细列出。到这里我们知道需要进行调度的流程已经产生了一条command记录到我们的数据库中。 3-SchedulerController.createSchedule: 需要定时调度的任务 创建流程的调度信息到表中t_ds_schedules 4-SchedulerController.online 这里添加任务到调度中 QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,schedule.getCrontab(), dataMap);
不管任务是否需要定时调度,都需要先进行流程定义
dolphinscheduler-server
这里我们主要看两个功能: master和work,一个负责调度任务流,一个负责执行具体的任务。 先通过学习master来了解它是如何对任务流进行调度的: MasterServer中的main函数会启动我们的MasterServer Spring实例化该Bean之后马上执行@PostConstruct注解下的run()方法
1-初始化zk相关的目录,注册master的时候会获取master的cpu,内存等信息 zkMasterClient.init(); 保存master的机器信息 /masters/hosts -> heartbeatZKInfo
2-提供了心跳,定时去更新master的机器信息
Runnable heartBeatThread = heartBeatThread(); heartbeatMasterService.scheduleAtFixedRate(heartBeatThread, 5, masterConfig.getMasterHeartbeatInterval(), TimeUnit.SECONDS);
3-QuartzExecutors.getInstance().start(); 启动quartz调度.才能添加调度任务。
4-MasterSchedulerThread:扫描线程,定时扫描数据库中的 command 表,根据不同的命令类型进行不同的 业务操作 既然需要去数据库中获取command,我们就先来看看command是从哪里插入的吧。上面dolphinscheduler-api 已经写了不用定时调度的command是从哪里进行插入的。 现在我们来看看需要定时调度的command是从哪里插入的。 在api的接口SchedulerController.online中我们看到了
QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,schedule.getCrontab(), dataMap);
它会将任务加入到调度中,定时执行 Quartz具体用法可以自己去看一下,我们只要知道真正执行任务的代码在ProcessScheduleJob这个类里面:
//1-根据scheduleId获取当前调度任务的详细调度信息 Schedule schedule = processDao.querySchedule(scheduleId); //2-从schedule信息中获取ProcessDefinitionId,获取流程定义的信息 ProcessDefinition processDefinition = processDao.findProcessDefineById(schedule.getProcessDefinitionId()); //3-创建调度的command Command command = new Command(); command.setCommandType(CommandType.SCHEDULER); ...... processDao.createCommand(command);
到这里我们就知道定时调度流程的command在这里被插入了。
5-回到MasterSchedulerThread源码分析: MasterSchedulerThread implements Runnable 我们之间看它的run方法
while (Stopper.isRunning()){ // process instance ProcessInstance processInstance = null; InterProcessMutex mutex = null; try { //1-检查当前master的cpu和内存情况是否符合要求,可以直接配置cpu和内存的阀值,使用过程中如果遇到类似日志可以检查这两个参数的配置:load or availablePhysicalMemorySize(G) is too high... boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); if(!runCheckFlag) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters //2-使用ZooKeeper分布式锁来实现同一时刻只有一台Master执行Scheduler String znodeLock = zkMasterClient.getMasterLockPath(); mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock); mutex.acquire(); ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService; int activeCount = poolExecutor.getActiveCount(); //3-获取一条command,对应之前运行流程保存的command /** * select command.* from t_ds_command command join t_ds_process_definition definition on command.process_definition_id = definition.id where definition.release_state = 1 AND definition.flag = 1 order by command.update_time asc limit 1 * * */ // make sure to scan and delete command table in one transaction Command command = processDao.findOneCommand(); if (command != null) { logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute(new MasterExecThread(processInstance,processDao)); } }catch (Exception e){ logger.error("scan command error ", e); processDao.moveToErrorCommand(command, e.toString()); } } else{ //indicate that no command ,sleep for 1s Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } }catch (Exception e){ logger.error("master scheduler thread exception",e); }finally{ AbstractZKClient.releaseMutex(mutex); } } @Transactional(rollbackFor = Exception.class) public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) { //1-根据getProcessDefinitionId构造一条流程实例。 ProcessInstance processInstance = constructProcessInstance(command, host); //cannot construct process instance, return null; if(processInstance == null){ logger.error("scan command, command parameter is error: %s", command.toString()); moveToErrorCommand(command, "process instance is null"); return null; } if(!checkThreadNum(command, validThreadNum)){ logger.info("there is not enough thread for this command: {}",command.toString() ); return setWaitingThreadProcess(command, processInstance); } processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); //2-保存流程实例信息 saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); //3-删除当前的command delCommandByid(command.getId()); return processInstance; }
具体逻辑都写在了代码的注释里面。
6-MasterExecThread:负责DAG任务切分、任务提交监控、各种不同命令类型的逻辑处理一样看run方法 这里只关注任务流的执行: // execute flow executeProcess(); 主要关注runProcess()方法.里面涉及到了解析前端的dag图的方法,这里也还没仔细研究。但是不影响 我们看整体的流程。 a-首先肯定会获取任务流中起始的任务节点先开始运行
// submit start node,提交任务流的头结点 submitPostNode(null); task会添加到readyToSubmitTaskList中
b-提交task到zk队列,具体实现在后面的MasterTaskExecThread中
if(canSubmitTaskToQueue()){ submitStandByTask(); }
从readyToSubmitTaskList中获取待提交的task 提交的任务信息会保存在activeTaskNode中
c-对已经提交的任务进行监控 activeTaskNode
//查看正在运行的task任务。 for(Map.Entry<MasterBaseTaskExecThread,Future<Boolean>> entry: activeTaskNode.entrySet()) { Future<Boolean> future = entry.getValue(); TaskInstance task = entry.getKey().getTaskInstance(); if(!future.isDone()){ continue; } // node monitor thread complete activeTaskNode.remove(entry.getKey()); if(task == null){ this.taskFailedSubmit = true; continue; } logger.info("task :{}, id:{} complete, state is {} ", task.getName(), task.getId(), task.getState().toString()); // node success , post node submit if(task.getState() == ExecutionStatus.SUCCESS){ completeTaskList.put(task.getName(), task); //2.1继续提交任务流中当前任务的后置任务 submitPostNode(task.getName()); continue; } // node fails, retry first, and then execute the failure process if(task.getState().typeIsFailure()){ if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){ this.recoverToleranceFaultTaskList.add(task); } if(task.taskCanRetry()){ addTaskToStandByList(task); }else{ // node failure, based on failure strategy errorTaskList.put(task.getName(), task); completeTaskList.put(task.getName(), task); if(processInstance.getFailureStrategy() == FailureStrategy.END){ killTheOtherTasks(); } } continue; } // other status stop/pause completeTaskList.put(task.getName(), task); }
7-MasterTaskExecThread:负责任务的提交持久化,work拉取的任务从这里进行提交
private TaskInstance submitTaskExec(TaskInstance taskInstance) { MasterBaseTaskExecThread abstractExecThread = null; if(taskInstance.isSubProcess()){ abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance); }else { abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance); } //提交task到zk队列 Future<Boolean> future = taskExecService.submit(abstractExecThread); //提交完成将任务加入到activeTaskNode,方便对已经提交的任务进行监控判断其是否已经完成 //abstractExecThread对应当前提交的task,future,可以获取执行的结果 activeTaskNode.putIfAbsent(abstractExecThread, future); return abstractExecThread.getTaskInstance(); }
taskExecService.submit(abstractExecThread);提交的线程任务会执行下面call方法,线程池submit提交的 任务通过Future可以获取返回的结果
@Override public Boolean call() throws Exception { return submitWaitComplete(); } //submit task instance and wait complete,提交task并且等待其完成 @Override public Boolean submitWaitComplete() { Boolean result = false; //提交task到db和zk队列,具体实现在MasterBaseTaskExecThread中,可以自己去看:submitQueue = processDao.submitTaskToQueue(task); this.taskInstance = submit(); if(this.taskInstance == null){ logger.error("submit task instance to mysql and queue failed , please check and fix it"); return result; } if(!this.taskInstance.getState().typeIsFinished()) { result = waitTaskQuit(); } taskInstance.setEndTime(new Date()); processDao.updateTaskInstance(taskInstance); logger.info("task :{} id:{}, process id:{}, exec thread completed ", this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() ); return result; } waitTaskQuit()方法中会通过while循环不停的去数据库检查taskInstance的状态, 判断当前task是否已经执行完成。 // task instance finished if (taskInstance.getState().typeIsFinished()){ break; }
如果执行成功就会返回Boolean true. 这样在MasterExecThread类中的 runProcess()方法里面就可以监控当前task是否已经完成
Future<Boolean> future = entry.getValue(); TaskInstance task = entry.getKey().getTaskInstance(); //如果没有完成就continue if(!future.isDone()){ continue; } //执行完成就可以提交当前task的后置任务了 ......
今天我的学习就到这里了,关于Dolphin Scheduler中的work实现后面继续学习,尽量也把zk队列和日志模 块也早点学习完成。争取后面在使用过程中可以一切顺利。