手记

基于时间轮的定时任务

最近使用定时任务框架看到xxl-job中去除了常用的quartz,自己实现了一个定时任务,
很有兴趣,就去看了下源码,发现是基于时间轮去实现的。
时间轮是什么呢?大家可以先去查一些资料,看一些图片和介绍,这里会直接在xxl-job的
使用和源码中去逐步探索。

xxl-job中定时任务如何运行

1-首先页面会配置任务,一般会为每个任务配置一个cron表达式并且保存到数据库中。
2-当点击执行该定时任务的时候,会根据cron表达式解析出其下次执行的时间,更新
到数据库中。
3-最后会有一个线程去不断的去数据库拉取最近5s需要执行的任务,然后通过时间轮
来进行将任务调度。
调度的主要逻辑都集中在JobScheduleHelper类中。

JobScheduleHelper的实现

JobScheduleHelper是一个单列。
1-来看看类中的一些属性:
    public static final long PRE_READ_MS = 5000;    // pre read

    private Thread scheduleThread;
    private Thread ringThread;
    private volatile boolean scheduleThreadToStop = false;
    private volatile boolean ringThreadToStop = false;
    private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
主要包含了两个线程和一个Map集合。

2-JobScheduleHelper.start()方法
项目启动的时间,就会进行start()方法的逻辑,会分别启动scheduleThread
和ringThread两个线程任务。

3-scheduleThread
scheduleThread主要作用就是从数据库中获取最近5s需要执行的任务,进行判断该
任务是否需要立刻执行、下次执行、还是放入时间轮中等待执行。

由于在xxl-job中master是没有中心节点的,所以调度可以每个节点都在执行,这里
为了方式相同节点同一时间执行相同的任务导致重复,利用了Mysql的行锁。
CREATE TABLE `xxl_job_lock` (
  `lock_name` varchar(50) NOT NULL COMMENT '锁名称',
  PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

 conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( 
"select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
使用mysql的select ...for update 语句将快照读变成了当前读,会对所读取的记录
进行加锁。这里lock_name 为主键,所以这里的select 会对查询的记录加上行锁,
当其他节点也在执行此sql查询的时候,因为当前查询语句的事物还没有结束,其他节点
的查询就会进行阻塞,直到当前事物提交。

List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().
 scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
获取到最近5s需要执行的任务集合
会根据任务下次执行的时间来执行不同的分支,这里只看如果获取到的任务,还差几秒
中才需要执行怎么办?
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
 //当前时间>任务下次执行时间+5,则说明任务过期了下次再执行
 // fresh next
 refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
  //当前时间>任务下次执行时间,并且在5s内,说明任务应该马上执行
  ......
} else {
//还差几秒种任务需要执行
// 1、make ring second  首先获取当前任务应该在哪一秒被执行
// 因为都是最近5s需要执行的任务,所以这里计算出具体需要执行的秒就可以
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

// 2、push time ring  放入map中
pushTimeRing(ringSecond, jobInfo.getId());

// 3、fresh next  计算下次需要执行的时间
refreshNextValidTime(jobInfo, newDate(jobInfo.getTriggerNextTime()));
}

4-pushTimeRing
 private void pushTimeRing(int ringSecond, int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);

        logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
    }
可以看到开始定义的map ringData 根据秒作为key,当前秒需要执行的jobId的集合
作为value.
意思就是最近5s获取的任务,如果需要在同一秒执行,则会拥有相同的key.

5-ringThread
ringThread做了什么呢
ringThread也是不断轮训,每次轮训会间隔1s
// next second, align second
TimeUnit.MILLISECONDS.sleep(1000System.currentTimeMillis()%1000);

每次先获取当前时刻的秒值,有了这个秒值,就可以去map中去获取任务集合了。
 // second data
 List<Integer> ringItemData = new ArrayList<>();
 int nowSecond = Calendar.getInstance().get(Calendar.SECOND);  
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
      List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
      if (tmpData != null) {
       ringItemData.addAll(tmpData);
      }
}
将获取到的任务提交到线程池去执行调度。
if (ringItemData.size() > 0) {
     // do trigger
     for (int jobId: ringItemData) {
           // do trigger
       JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
     }
     // clear
     ringItemData.clear();
}
   
在xxl-job中定时任务的调度整体实现就是这样,仔细想来是不是感觉很有意思。
借助了数据库的行锁,获取最近5秒需要执行的任务,将任务真正执行时间的秒值作为
key放入map中,任务的id集合作为value.因为是最近5s的任务,所以不会存在需要执
行的任务在一个轮子中放不下的情况。   

基于数组的实现思路

我们可以先定义我们时间轮的大小,并且基于数组实现
private int SIZE=64;
//基于数组实现 相同的index对应一个任务的集合
private Object[] wheelBuffer;

定义我们的任务类包含如下属性:
public abstract static class TimerTask extends Thread{
        //需要延迟的时间
        private int delay;
        //在轮子中的下标
        private int index;

        //第几轮才会执行此任务
        private int cycleNum;
        @Override
        public void run() {
        }
...
}
主要需要提供如下方法:

1-计算任务延迟时间在时间轮中的下标和轮次
有了需要延迟执行的时间delay和时间轮的大小,就可以计算其在时间轮中的下标了。
时间轮的指针每走一格,我们可以用
private AtomicInteger tick=new AtomicInteger();对其计数+1。因为
时间轮在不停转动,因此新增的任务计算下标应该算上已经转动的次数。
计算下标:
delay % (SIZE+tick.get())
计算轮次:
delay/BUFFER_SIZE

2-新增任务到时间轮中
先根据任务延迟时间获取任务在时间轮中的下标和轮次,然后将其加入到对应
下标任务的集合中。
如果已经存在在直接add
Set<TimerTask>taskSets= (Set<TimerTask>) ringBuffer[index];
如果不存在则创建集合并且放入数组
ringBuffer[index]=set;

3-从时间轮中获取任务集合
获取任务的时候通过当前时间轮转动到的index来获取任务集合set,需要遍历判断
任务是否是当前轮次需要执行的任务,如果当前轮次不执行,
则需要将当前下标对应的任务集合进行更新。

4-将任务提交到线程池进行执行
时间轮继续转动,并且通过一个index来计数当前运行到时间轮中的哪个下标的任务
index++
当运行到了时间轮数组大小的时候,重新将index设置为0  
//每秒获取一次任务集合然后执行
Thread.sleep(1000);

总结

时间轮在很多开源框架中都有使用,更多的学习可以看看在dubbo中是如何使用的
1人推荐
随时随地看视频
慕课网APP

热门评论

你好,文章写很棒。有个问题,5秒间隔时间去轮询不会出现某些任务无法按时执行吗?

查看全部评论