手记

elastic-job-lite简单入门

简介

Elastic-Job分为两个独立的子项目:
Elastic-Job-Lite和Elastic-Job-Cloud

Elastic-Job-Lite

简单的说Elastic-Job-Lite就是一个分布式定时任务。

Quick Start


在SpringBoot中实现一个DEMO.

因为项目中使用了很多ZK的特性,首先需要安装一个ZK.
创建一个SpringBoot项目.
快速使用的话首先要引入相关的依赖包:

        <!-- 引入elastic-job -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.1.5</version>
        </dependency>

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
        </dependency>

在使用的时候 首先要配置ZK注册中心,然后配置我们需要调度的任务信息

/**
 * 配置ZK注册中心,然后进行初始化
 * */
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class RegistryCenterConfig {


    @Value("${registry.serverList}")
    private String serverList;

    @Value("${registry.namespace}")
    private String namespace;


    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter zookeeperRegistryCenter() {


        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }
}

实现一个有业务逻辑的Job,实现SimpleJob接口
/**
 * 基于SpringBoot实现一个简单的Job
 * */
public class SpringBootSimpleJob implements SimpleJob{

    public void execute(ShardingContext shardingContext) {

        // do something

    }
}

@Configuration
public class SimpleJobConfig {

    @Autowired
    private ZookeeperRegistryCenter zookeeperRegistryCenter;

    /**
     * 自己实现的job
     * */
    @Bean
    public SimpleJob simpleJob() {
        return new SpringBootSimpleJob();
    }


    /**
     * 将自己实现的job加入调度中执行
     * */
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron,
                                           @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {

        return new SpringJobScheduler(simpleJob, zookeeperRegistryCenter,
                getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
    }

    /**
     * 作业的配置
     * */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron,
                                                         final int shardingTotalCount, final String shardingItemParameters) {

        return LiteJobConfiguration.newBuilder(
                new SimpleJobConfiguration(
                        JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                                .shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();

    }

}

上面就完成了一个简单Job的配置,可以进行测试使用。

源码学习

通过上面创建的demo,整体熟悉一下elastic-job-lite的代码流。

入口:SpringJobScheduler
参数:(ElasticJob, CoordinatorRegistryCenter, LiteJobConfiguration)
可以看到分别传入了:
自己实现的SpringBootSimpleJob
zookeeperRegistryCenter
getLiteJobConfiguration()


getLiteJobConfiguration()方法对我们传入的job信息进行了配置,最后返回了LiteJobConfiguration。
上面的DEMO只传入了少量的参数cron,shardingTotalCount,shardingItemParameters
还有更多的参数可以配置,可以参考JobCoreConfiguration这个类里面的字段信息。


SpringJobScheduler extends JobScheduler:
直接看 JobScheduler 里面

     /**
     * 初始化作业.
     */
    public void init() {

        //去ZK 保存或者更新 job的信息  (ZookeeperRegistryCenter)
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);

        //单列模式实现的一个缓存 (当前分片总数)
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());

        //初始化 JobScheduleController
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());

        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);

        //注册作业的启动信息
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());

        //启动Quartz调度
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }


JobScheduleController初始化:
创建Quartz的调度器
指定Quartz调度时候执行任务的具体实现类LiteJob


registerStartUpInfo注册作业的启动信息:
利用ZK选举当前作业的主节点
持久化作业服务器上线信息
持久化作业运行实例上线相关信息


scheduleJob()加入Quartz调度并且进行调度


上面已经知道了具体在调度中执行的类是LiteJob,继续看LiteJob中的execute()

    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }

getJobExecutor:
获取作业执行器,我们使用的是SimpleJobExecutor

execute()方法会走到AbstractElasticJobExecutor中,进行作业的执行。

暂时只关注三个流程
1-获取当前作业服务器的分片上下文.

ShardingContexts shardingContexts = jobFacade.getShardingContexts();

getShardingContexts()方法中做的事情:

判断是否需要分片,如果需要就进行分片(调用了shardingService.shardingIfNecessary()方法进行判断)
在shardingIfNecessary()中会判断当前节点是否是主节点,如果是子节点 在这里 等待分片的完成!!!  不然不能执行后续代码。
进行具体分片:调用JobShardingStrategy.sharding()方法进行分片,有几种实现类,先不管。
分片结束会进程将分片信息保存到ZK:
curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and();
entry.getKey().getJobInstanceId()可以简单理解为一个服务器
大致保存的内容可以理解为:
服务器1 中  有 [0,1,2]分片
服务器2 中  有 [3,4,5]分片


2-分片完成后,获取当前机器需要执行的分片任务!!! (当前作业的主节点会进行分片,子节点会等待主节点分片完成, 最后都会走分片后的逻辑。)

List<Integer> shardingItems = shardingService.getLocalShardingItems();
 
 public List<Integer> getShardingItems(final String jobInstanceId) {
        JobInstance jobInstance = new JobInstance(jobInstanceId);
        if (!serverService.isAvailableServer(jobInstance.getIp())) {
            return Collections.emptyList();
        }
        List<Integer> result = new LinkedList<>();
        int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
        for (int i = 0; i < shardingTotalCount; i++) {
            if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
                result.add(i);
            }
        }
        return result;
    }

可以看到同样是根据getJobInstanceId  来获取 分片任务的,和之前呼应。


3-真正的执行分片的作业
具体代码不展示了
最后会调用抽象方法protected abstract void process(ShardingContext shardingContext);

之前已经知道了具体的实现类:SimpleJobExecutor:

    @Override
    protected void process(final ShardingContext shardingContext) {
        simpleJob.execute(shardingContext);
    }
    
    
 可以看到这里真正调用了我们的 simpleJob (具体实现类SpringBootSimpleJob)  的  execute方法


常用类

对源码中常用的类进行分析

JobRegistry类:作业注册表

JobRegistry是一个单列的类,在项目启动后只有一个实例。在这里面存储了作业的相关信息

     // 作业名称 ---> 作业调度控制器
     Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap<>();
    
     // 作业名称 ---> 注册中心
     Map<String, CoordinatorRegistryCenter> regCenterMap = new ConcurrentHashMap<>();
    
     // 作业名称 ---> 作业实例
     Map<String, JobInstance> jobInstanceMap = new ConcurrentHashMap<>();
    
     //作业名称 ---> 作业是否运行     获取作业是否在运行
     Map<String, Boolean> jobRunningMap = new ConcurrentHashMap<>();
    
      //作业名称 ---> 作业的分片数量
     Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap<>();



ZookeeperRegistryCenter类:注册中心
ZookeeperRegistryCenter依赖  curator  来实现
初始化的时候回调用init()方法进行启动客户的(client.start();)

注册中心具体提供了Zk操作的一些工具方法
比如创建持久节点,创建临时节点。。

TreeCache:
可以监听指定节点下的节点的变化(所有节点的变化,多级目录)
可以监听的事件包括:节点创建,节点数据的变化,节点的删除
        TreeCache cache = new TreeCache(client, cachePath);
        try {
            cache.start();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
        caches.put(cachePath + "/", cache);

Map<String, TreeCache> caches = new HashMap<>();  //保存了每个监听节点的cache


JobNodePath类:作业节点路径类
基本上所有获取的节点都会加上作业名称(jobName) 



JobNodeStorage类:作业节点数据访问类


LeaderService类:主节点的服务类
先通过LeaderLatch 设置 主节点,如果leader节点选举成功,会保存作业实例节点到ZK


ServerService类:作业服务器服务
ServerService提供了注册上线的服务器的信息(jobName/server/ip)
提供了hasAvailableServers获取可用服务器信息(jobName/server)

和xxl-job的对比


之了解过xxl-job,感觉整体流程不一样
xxl-job会有一个Master节点,利用RPC分发任务到worker节点。
worker节点启动后会注册到zk或者mysql.  Master节点分配任务会根据任务配置 和 负载策略选择一个worker进行任务的执行。
xxl-job整体调度的实现也是使用Quartz.



elastic-job整体感觉是启动节点后,会先选择一个主节点,也会将信息保存到zk中,(主节点和其他节点都是调度的工作节点,只不过主节点需要完成分片的功能),然后添加作业到调度
在调度进行的时候(主节点)进行分片的配置,从节点会等待分片完成,配置完成后,主节点和从节点会进行获取当前节点分片的任务,执行具体的job.
更多的使用了ZK.

2人推荐
随时随地看视频
慕课网APP