手记

dolphinscheduler入门学习

首先希望大家健康平安。
对于互联网公司的大数据,一般都会需要一个数据开发平台。
incubator-dolphinscheduler再适合不过了。今天我也开始进行学习一下,然后记录一下学习的内容。
通过阅读源码从简单的地方入手学习,来看一下具体的实现逻辑,对后续的安装部署和使用中遇到的问题肯定
会有帮助。

 1-代码下载和查看

 mvn -U clean package -Prelease -Dmaven.test.skip=true
 mvn install

 2-官网的定义

看源码之前先了解这些细节.会对源码中相关的概念有帮助
DAG: 全称Directed Acyclic Graph,简称DAG。工作流中的Task任务以有向无环图的形式组装起来,
从入度为零的节点进行拓扑遍历,直到无后继节点为止。
流程定义:通过拖拽任务节点并建立任务节点的关联所形成的可视化DAG
流程实例:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次,
产生一个流程实例
任务实例:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态
DolphinScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,
并使用Zookeeper分布式锁来选举其中的一台Master或Worker为“管理者”来执行任务


dolphinscheduler-api

dolphinscheduler-api模块主要提供了和前端交互的接口。
为了了解整个系统的运行情况,我们只要先关心几个接口就可以。
1-ProcessDefinitionController.createProcessDefinition:
 流程定义,保存我们页面定义的流程。
2-ExecutorController.startProcessInstance:  直接运行的任务,不用调度
 产生一条Command到数据库中:processDao.createCommand(command);
 具体代码就不详细列出。到这里我们知道需要进行调度的流程已经产生了一条command记录到我们的数据库中。
3-SchedulerController.createSchedule: 需要定时调度的任务
 创建流程的调度信息到表中t_ds_schedules
4-SchedulerController.online
 这里添加任务到调度中
 QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, 
 startDate, endDate,schedule.getCrontab(), dataMap);
 不管任务是否需要定时调度,都需要先进行流程定义

 dolphinscheduler-server

这里我们主要看两个功能:
master和work,一个负责调度任务流,一个负责执行具体的任务。
先通过学习master来了解它是如何对任务流进行调度的:
MasterServer中的main函数会启动我们的MasterServer
Spring实例化该Bean之后马上执行@PostConstruct注解下的run()方法
1-初始化zk相关的目录,注册master的时候会获取master的cpu,内存等信息
zkMasterClient.init();
保存master的机器信息
/masters/hosts  -> heartbeatZKInfo
2-提供了心跳,定时去更新master的机器信息
Runnable heartBeatThread = heartBeatThread();
  heartbeatMasterService.scheduleAtFixedRate(heartBeatThread, 5, 
  masterConfig.getMasterHeartbeatInterval(), TimeUnit.SECONDS);
3-QuartzExecutors.getInstance().start();
启动quartz调度.才能添加调度任务。
4-MasterSchedulerThread:扫描线程,定时扫描数据库中的 command 表,根据不同的命令类型进行不同的
业务操作
既然需要去数据库中获取command,我们就先来看看command是从哪里插入的吧。上面dolphinscheduler-api
已经写了不用定时调度的command是从哪里进行插入的。
现在我们来看看需要定时调度的command是从哪里插入的。
在api的接口SchedulerController.online中我们看到了
QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, 
endDate,schedule.getCrontab(), dataMap);
它会将任务加入到调度中,定时执行              
Quartz具体用法可以自己去看一下,我们只要知道真正执行任务的代码在ProcessScheduleJob这个类里面:


//1-根据scheduleId获取当前调度任务的详细调度信息       
Schedule schedule = processDao.querySchedule(scheduleId);
//2-从schedule信息中获取ProcessDefinitionId,获取流程定义的信息
ProcessDefinition processDefinition = processDao.findProcessDefineById(schedule.getProcessDefinitionId());
  
//3-创建调度的command
Command command = new Command();
command.setCommandType(CommandType.SCHEDULER);
  
  ......     
processDao.createCommand(command);
到这里我们就知道定时调度流程的command在这里被插入了。

 

5-回到MasterSchedulerThread源码分析:
MasterSchedulerThread implements Runnable
我们之间看它的run方法
 while (Stopper.isRunning()){
            // process instance
            ProcessInstance processInstance = null;
            InterProcessMutex mutex = null;
            try {
                //1-检查当前master的cpu和内存情况是否符合要求,可以直接配置cpu和内存的阀值,使用过程中如果遇到类似日志可以检查这两个参数的配置:load or availablePhysicalMemorySize(G) is too high...
                boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
                if(!runCheckFlag) {
                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                    continue;
                }
                if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
                    // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/masters
                    //2-使用ZooKeeper分布式锁来实现同一时刻只有一台Master执行Scheduler
                    String znodeLock = zkMasterClient.getMasterLockPath();
                    mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
                    mutex.acquire();
                    ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
                    int activeCount = poolExecutor.getActiveCount();
                    //3-获取一条command,对应之前运行流程保存的command
                    /**
                     * select command.* from t_ds_command command
                     join t_ds_process_definition definition on command.process_definition_id = definition.id
                     where definition.release_state = 1 AND definition.flag = 1
                     order by command.update_time asc
                     limit 1
                     *
                     * */
                    // make sure to scan and delete command  table in one transaction
                    Command command = processDao.findOneCommand();
                    if (command != null) {
                        logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
                        try{
                            processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
                            if (processInstance != null) {
                                logger.info("start master exec thread , split DAG ...");
                                masterExecService.execute(new MasterExecThread(processInstance,processDao));
                            }
                        }catch (Exception e){
                            logger.error("scan command error ", e);
                            processDao.moveToErrorCommand(command, e.toString());
                        }
                    } else{
                        //indicate that no command ,sleep for 1s
                        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                    }
                }
            }catch (Exception e){
                logger.error("master scheduler thread exception",e);
            }finally{
                AbstractZKClient.releaseMutex(mutex);
            }
        }
 
  @Transactional(rollbackFor = Exception.class)
    public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {
        //1-根据getProcessDefinitionId构造一条流程实例。
        ProcessInstance processInstance = constructProcessInstance(command, host);
        //cannot construct process instance, return null;
        if(processInstance == null){
            logger.error("scan command, command parameter is error: %s", command.toString());
            moveToErrorCommand(command, "process instance is null");
            return null;
        }
        if(!checkThreadNum(command, validThreadNum)){
            logger.info("there is not enough thread for this command: {}",command.toString() );
            return setWaitingThreadProcess(command, processInstance);
        }
        processInstance.setCommandType(command.getCommandType());
        processInstance.addHistoryCmd(command.getCommandType());
        //2-保存流程实例信息
        saveProcessInstance(processInstance);
        this.setSubProcessParam(processInstance);
        //3-删除当前的command
        delCommandByid(command.getId());
        return processInstance;
    }
具体逻辑都写在了代码的注释里面。

 

6-MasterExecThread:负责DAG任务切分、任务提交监控、各种不同命令类型的逻辑处理一样看run方法
这里只关注任务流的执行:
// execute flow
executeProcess();
 
主要关注runProcess()方法.里面涉及到了解析前端的dag图的方法,这里也还没仔细研究。但是不影响
我们看整体的流程。
a-首先肯定会获取任务流中起始的任务节点先开始运行


// submit start node,提交任务流的头结点
  submitPostNode(null);
  task会添加到readyToSubmitTaskList中
b-提交task到zk队列,具体实现在后面的MasterTaskExecThread中
if(canSubmitTaskToQueue()){
      submitStandByTask();
  }
从readyToSubmitTaskList中获取待提交的task
提交的任务信息会保存在activeTaskNode中
c-对已经提交的任务进行监控  activeTaskNode
//查看正在运行的task任务。
            for(Map.Entry<MasterBaseTaskExecThread,Future<Boolean>> entry: activeTaskNode.entrySet()) {
                Future<Boolean> future = entry.getValue();
                TaskInstance task  = entry.getKey().getTaskInstance();
                if(!future.isDone()){
                    continue;
                }
                // node monitor thread complete
                activeTaskNode.remove(entry.getKey());
                if(task == null){
                    this.taskFailedSubmit = true;
                    continue;
                }
                logger.info("task :{}, id:{} complete, state is {} ",
                        task.getName(), task.getId(), task.getState().toString());
                // node success , post node submit
                if(task.getState() == ExecutionStatus.SUCCESS){
                    completeTaskList.put(task.getName(), task);
                    //2.1继续提交任务流中当前任务的后置任务
                    submitPostNode(task.getName());
                    continue;
                }
                // node fails, retry first, and then execute the failure process
                if(task.getState().typeIsFailure()){
                    if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
                        this.recoverToleranceFaultTaskList.add(task);
                    }
                    if(task.taskCanRetry()){
                        addTaskToStandByList(task);
                    }else{
                        // node failure, based on failure strategy
                        errorTaskList.put(task.getName(), task);
                        completeTaskList.put(task.getName(), task);
                        if(processInstance.getFailureStrategy() == FailureStrategy.END){
                            killTheOtherTasks();
                        }
                    }
                    continue;
                }
                // other status stop/pause
                completeTaskList.put(task.getName(), task);
            }
7-MasterTaskExecThread:负责任务的提交持久化,work拉取的任务从这里进行提交

 

  private TaskInstance submitTaskExec(TaskInstance taskInstance) {
        MasterBaseTaskExecThread abstractExecThread = null;
        if(taskInstance.isSubProcess()){
            abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);
        }else {
            abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);
        }
        //提交task到zk队列
        Future<Boolean> future = taskExecService.submit(abstractExecThread);
        //提交完成将任务加入到activeTaskNode,方便对已经提交的任务进行监控判断其是否已经完成
        //abstractExecThread对应当前提交的task,future,可以获取执行的结果
        activeTaskNode.putIfAbsent(abstractExecThread, future);
        return abstractExecThread.getTaskInstance();
    }

 

taskExecService.submit(abstractExecThread);提交的线程任务会执行下面call方法,线程池submit提交的
任务通过Future可以获取返回的结果
  @Override
    public Boolean call() throws Exception {
        return submitWaitComplete();
    }
    //submit task instance and wait complete,提交task并且等待其完成
    @Override
    public Boolean submitWaitComplete() {
        Boolean result = false;
        //提交task到db和zk队列,具体实现在MasterBaseTaskExecThread中,可以自己去看:submitQueue = processDao.submitTaskToQueue(task);
        this.taskInstance = submit();
        if(this.taskInstance == null){
            logger.error("submit task instance to mysql and queue failed , please check and fix it");
            return result;
        }
        if(!this.taskInstance.getState().typeIsFinished()) {
            result = waitTaskQuit();
        }
        taskInstance.setEndTime(new Date());
        processDao.updateTaskInstance(taskInstance);
        logger.info("task :{} id:{}, process id:{}, exec thread completed ",
                this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
        return result;
    }
    waitTaskQuit()方法中会通过while循环不停的去数据库检查taskInstance的状态,
    判断当前task是否已经执行完成。
     // task instance finished
     if (taskInstance.getState().typeIsFinished()){
          break;
     }
如果执行成功就会返回Boolean  true.
这样在MasterExecThread类中的
runProcess()方法里面就可以监控当前task是否已经完成
    Future<Boolean> future = entry.getValue();
   TaskInstance task  = entry.getKey().getTaskInstance();
   //如果没有完成就continue
   if(!future.isDone()){
      continue;
   }  
   //执行完成就可以提交当前task的后置任务了
                ......

 

今天我的学习就到这里了,关于Dolphin Scheduler中的work实现后面继续学习,尽量也把zk队列和日志模
块也早点学习完成。争取后面在使用过程中可以一切顺利。

 

 


4人推荐
随时随地看视频
慕课网APP