small_925_ant
2019-07-14 21:27:20浏览 5004
简介
Elastic-Job分为两个独立的子项目:
Elastic-Job-Lite和Elastic-Job-Cloud
Elastic-Job-Lite
简单的说Elastic-Job-Lite就是一个分布式定时任务。
Quick Start
在SpringBoot中实现一个DEMO.
因为项目中使用了很多ZK的特性,首先需要安装一个ZK.
创建一个SpringBoot项目.
快速使用的话首先要引入相关的依赖包:
<!-- 引入elastic-job -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
在使用的时候 首先要配置ZK注册中心,然后配置我们需要调度的任务信息
/**
* 配置ZK注册中心,然后进行初始化
* */
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class RegistryCenterConfig {
@Value("${registry.serverList}")
private String serverList;
@Value("${registry.namespace}")
private String namespace;
@Bean(initMethod = "init")
public ZookeeperRegistryCenter zookeeperRegistryCenter() {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
实现一个有业务逻辑的Job,实现SimpleJob接口
/**
* 基于SpringBoot实现一个简单的Job
* */
public class SpringBootSimpleJob implements SimpleJob{
public void execute(ShardingContext shardingContext) {
// do something
}
}
@Configuration
public class SimpleJobConfig {
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* 自己实现的job
* */
@Bean
public SimpleJob simpleJob() {
return new SpringBootSimpleJob();
}
/**
* 将自己实现的job加入调度中执行
* */
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron,
@Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(simpleJob, zookeeperRegistryCenter,
getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
}
/**
* 作业的配置
* */
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron,
final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(
new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
}
}
上面就完成了一个简单Job的配置,可以进行测试使用。
源码学习
通过上面创建的demo,整体熟悉一下elastic-job-lite的代码流。
入口:SpringJobScheduler
参数:(ElasticJob, CoordinatorRegistryCenter, LiteJobConfiguration)
可以看到分别传入了:
自己实现的SpringBootSimpleJob
zookeeperRegistryCenter
getLiteJobConfiguration()
getLiteJobConfiguration()方法对我们传入的job信息进行了配置,最后返回了LiteJobConfiguration。
上面的DEMO只传入了少量的参数cron,shardingTotalCount,shardingItemParameters
还有更多的参数可以配置,可以参考JobCoreConfiguration这个类里面的字段信息。
SpringJobScheduler extends JobScheduler:
直接看 JobScheduler 里面
/**
* 初始化作业.
*/
public void init() {
//去ZK 保存或者更新 job的信息 (ZookeeperRegistryCenter)
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
//单列模式实现的一个缓存 (当前分片总数)
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
//初始化 JobScheduleController
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
//注册作业的启动信息
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
//启动Quartz调度
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
JobScheduleController初始化:
创建Quartz的调度器
指定Quartz调度时候执行任务的具体实现类LiteJob
registerStartUpInfo注册作业的启动信息:
利用ZK选举当前作业的主节点
持久化作业服务器上线信息
持久化作业运行实例上线相关信息
scheduleJob()加入Quartz调度并且进行调度
上面已经知道了具体在调度中执行的类是LiteJob,继续看LiteJob中的execute()
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
getJobExecutor:
获取作业执行器,我们使用的是SimpleJobExecutor
execute()方法会走到AbstractElasticJobExecutor中,进行作业的执行。
暂时只关注三个流程
1-获取当前作业服务器的分片上下文.
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
getShardingContexts()方法中做的事情:
判断是否需要分片,如果需要就进行分片(调用了shardingService.shardingIfNecessary()方法进行判断)
在shardingIfNecessary()中会判断当前节点是否是主节点,如果是子节点 在这里 等待分片的完成!!! 不然不能执行后续代码。
进行具体分片:调用JobShardingStrategy.sharding()方法进行分片,有几种实现类,先不管。
分片结束会进程将分片信息保存到ZK:
curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and();
entry.getKey().getJobInstanceId()可以简单理解为一个服务器
大致保存的内容可以理解为:
服务器1 中 有 [0,1,2]分片
服务器2 中 有 [3,4,5]分片
2-分片完成后,获取当前机器需要执行的分片任务!!! (当前作业的主节点会进行分片,子节点会等待主节点分片完成, 最后都会走分片后的逻辑。)
List<Integer> shardingItems = shardingService.getLocalShardingItems();
public List<Integer> getShardingItems(final String jobInstanceId) {
JobInstance jobInstance = new JobInstance(jobInstanceId);
if (!serverService.isAvailableServer(jobInstance.getIp())) {
return Collections.emptyList();
}
List<Integer> result = new LinkedList<>();
int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
for (int i = 0; i < shardingTotalCount; i++) {
if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
result.add(i);
}
}
return result;
}
可以看到同样是根据getJobInstanceId 来获取 分片任务的,和之前呼应。
3-真正的执行分片的作业
具体代码不展示了
最后会调用抽象方法protected abstract void process(ShardingContext shardingContext);
之前已经知道了具体的实现类:SimpleJobExecutor:
@Override
protected void process(final ShardingContext shardingContext) {
simpleJob.execute(shardingContext);
}
可以看到这里真正调用了我们的 simpleJob (具体实现类SpringBootSimpleJob) 的 execute方法
常用类
对源码中常用的类进行分析
JobRegistry类:作业注册表
JobRegistry是一个单列的类,在项目启动后只有一个实例。在这里面存储了作业的相关信息
// 作业名称 ---> 作业调度控制器
Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap<>();
// 作业名称 ---> 注册中心
Map<String, CoordinatorRegistryCenter> regCenterMap = new ConcurrentHashMap<>();
// 作业名称 ---> 作业实例
Map<String, JobInstance> jobInstanceMap = new ConcurrentHashMap<>();
//作业名称 ---> 作业是否运行 获取作业是否在运行
Map<String, Boolean> jobRunningMap = new ConcurrentHashMap<>();
//作业名称 ---> 作业的分片数量
Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap<>();
ZookeeperRegistryCenter类:注册中心
ZookeeperRegistryCenter依赖 curator 来实现
初始化的时候回调用init()方法进行启动客户的(client.start();)
注册中心具体提供了Zk操作的一些工具方法
比如创建持久节点,创建临时节点。。
TreeCache:
可以监听指定节点下的节点的变化(所有节点的变化,多级目录)
可以监听的事件包括:节点创建,节点数据的变化,节点的删除
TreeCache cache = new TreeCache(client, cachePath);
try {
cache.start();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
caches.put(cachePath + "/", cache);
Map<String, TreeCache> caches = new HashMap<>(); //保存了每个监听节点的cache
JobNodePath类:作业节点路径类
基本上所有获取的节点都会加上作业名称(jobName)
JobNodeStorage类:作业节点数据访问类
LeaderService类:主节点的服务类
先通过LeaderLatch 设置 主节点,如果leader节点选举成功,会保存作业实例节点到ZK
ServerService类:作业服务器服务
ServerService提供了注册上线的服务器的信息(jobName/server/ip)
提供了hasAvailableServers获取可用服务器信息(jobName/server)
和xxl-job的对比
之了解过xxl-job,感觉整体流程不一样
xxl-job会有一个Master节点,利用RPC分发任务到worker节点。
worker节点启动后会注册到zk或者mysql. Master节点分配任务会根据任务配置 和 负载策略选择一个worker进行任务的执行。
xxl-job整体调度的实现也是使用Quartz.
elastic-job整体感觉是启动节点后,会先选择一个主节点,也会将信息保存到zk中,(主节点和其他节点都是调度的工作节点,只不过主节点需要完成分片的功能),然后添加作业到调度
在调度进行的时候(主节点)进行分片的配置,从节点会等待分片完成,配置完成后,主节点和从节点会进行获取当前节点分片的任务,执行具体的job.
更多的使用了ZK.