手记

dolphinscheduler入门学习(2)

之前入门了整个dolphinscheduler的模块和master的大概流程:https://www.imooc.com/article/300193
今天属性一下worker。worker肯定是用来执行具体的调度任务的。
通过之前对master的了解,已经知道了dolphinscheduler将需要执行的task放入了用zk实现的队列中。
所以worker肯定是从这个队列中取出task,然后进行执行。
先从WorkerServer入手,同样也会进行run方法:
    @PostConstruct
    public void run(){
    
    ...
    }

1- zkWorkerClient.init();


 注册work,在zk创建相关目录,并且保存当前机器的cup内存等信息。
 String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
  // create temporary sequence nodes for master znode
  String registerPath= getZNodeParentPath(zkNodeType) + SINGLE_SLASH + host;
 super.persistEphemeral(registerPath, heartbeatZKInfo);


2-this.taskQueue = TaskQueueFactory.getTaskQueueInstance();


这里获取ITaskQueue
实现类是TaskQueueZkImpl基于zookeeper实现。里面有master放入的待执行的task.


3-提供了心跳,定期更新当前机器的信息

 Runnable heartBeatThread = heartBeatThread();    
 String str = splits[0] + Constants.COMMA
+ splits[1] + Constants.COMMA
+ OSUtils.cpuUsage() + Constants.COMMA
+ OSUtils.memoryUsage() + Constants.COMMA
+ OSUtils.loadAverage() + Constants.COMMA
+ splits[5] + Constants.COMMA
+ DateUtils.dateToString(new Date());
zkClient.setData().forPath(znode,str.getBytes());


4-FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processDao, taskQueue);

FetchTaskThread真正去拉取task的线程。
具体实现看它的run()方法:
A 首先会去检查机器的负载情况和worker线程使用情况
 //check memory and cpu usage and threads
 boolean runCheckFlag = OSUtils.checkResource(workerConfig.getWorkerMaxCpuloadAvg(), workerConfig.getWorkerReservedMemory()) && checkThreadCount(poolExecutor);
 如果都符合条件就会继续。

B boolean hasTask = taskQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
先判断队列中是否有task,有的话继续
获取ZK的分布式锁
  // creating distributed locks, lock path /dolphinscheduler/lock/worker
  mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(),
                      zkWorkerClient.getWorkerLockPath()); 
                     
C 具体获取锁的方法如下                    
 InterProcessMutex mutex = new InterProcessMutex(zkClient, zNodeLockPath);
 mutex.acquire();      
 大体原理就是利用zk的临时顺序节点,最小的节点可以获取锁,后面的节点对前面的节点进行监听。              
                        
D 拉取task
 当获取分布式锁成功后进行获取task,taskNum设置一次获取的数量
 List<String> taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum);                        
 重点关注一下poll方法:
 会根据流程的优先级和任务的优先级对task进行排序。
 通过TreeSet实现排序的规则,通过比较字符串的大小:
 Set<String> taskTreeSet = new TreeSet<>(new Comparator<String>() {
          @Override
          public int compare(String o1, String o2) {
              String s1 = o1;
              String s2 = o2;
              String[] s1Array = s1.split(Constants.UNDERLINE);
              if(s1Array.length>4){
                 // warning: if this length > 5, need to be changed
                 s1 = s1.substring(0, s1.lastIndexOf(Constants.UNDERLINE) );
              }
              String[] s2Array = s2.split(Constants.UNDERLINE);
              if(s2Array.length>4){
                 // warning: if this length > 5, need to be changed
                 s2 = s2.substring(0, s2.lastIndexOf(Constants.UNDERLINE) );
              }
              return s1.compareTo(s2);
             }
          }); 
 
 
 添加任务的具体方法,会过滤掉当前work无权处理的task:
 for (int i = 0; i < size; i++) {
  
                    String taskDetail = list.get(i);
                    String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
                    //forward compatibility
                    if(taskDetailArrs.length >= 4){
                        //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
                        String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0],
                         Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
                        if(taskDetailArrs.length > 4){
                            String taskHosts = taskDetailArrs[4];
                            //这里会判断task是否可以分给任何的work,如果不能,则判断当前work是否符合当前任务需要分配的work分组。
                      
                            //task can assign to any worker host if equals default ip value of worker server
                            if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){
                                String[] taskHostsArr = taskHosts.split(Constants.COMMA);
                                if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
                                    continue;
                                }
                            }
                            formatTask += Constants.UNDERLINE + taskDetailArrs[4];
                        }
                        taskTreeSet.add(formatTask);
                    }
                }


5-获取的task信息列表后会从数据库获取到具体task的taskInstance。       

taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstId);


6-提交任务

 // submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
TaskScheduleThread是具体执行task的线程
具体是执行方法就是run()
task = TaskManager.newTask(taskInstance.getTaskType(),
                    taskProps,
                    taskLogger);
这里面会根据任务的类型获取具体的实现类:
  public static AbstractTask newTask(String taskType, TaskProps props, Logger logger)
      throws IllegalArgumentException {
    switch (EnumUtils.getEnum(TaskType.class,taskType)) {
        case SHELL:
        return new ShellTask(props, logger);
      case PROCEDURE:
        return new ProcedureTask(props, logger);
      case SQL:
        return new SqlTask(props, logger);
      case MR:
        return new MapReduceTask(props, logger);
      case SPARK:
        return new SparkTask(props, logger);
      case FLINK:
        return new FlinkTask(props, logger);
      case PYTHON:
        return new PythonTask(props, logger);
      case DEPENDENT:
        return new DependentTask(props, logger);
      case HTTP:
        return new HttpTask(props, logger);
      default:
        logger.error("unsupport task type: {}", taskType);
        throw new IllegalArgumentException("not support task type");
    }
  }
}
调用具体实现类中的方法来完成task
            // task init
            task.init();
            // task handle
            task.handle();
            // task result process
            task.after();
            
            
在master的了解中,每个task的执行过程,master会对其数据库中的状态进行监控。所以task执行完成肯定会更
新数据库的状态。更新状态的代码如下:
processDao.changeTaskState(task.getExitStatus(),
                new Date(),
                taskInstance.getId());    
                
但是任务具体执行成功还是失败的status是在task.after();中进行设置的。           
具体实现就不贴出代码了。
当发现数据库中这个task的status是成功时,这样master就可以继续提交后置的task到zk队列中了。

                 

7-总结

其他细节就暂时不深入了解了,知道了大体实现,后续准备继续学习环境的搭建和使用。



0人推荐
随时随地看视频
慕课网APP