elastic job原理解析-JobSchedule
jobschedule完成了很多和quartz相关的内容,并且也会将服务信息注册到zk上,并且做好相关的job信息的cache。在整个任务的调度中,jobSchedule完成了elastic job主干流程。
JobSchedule
elastic job在启动的时候需要注册bean schedule,调用init的方法,完成初始化配置信息。具体操作如下
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron, @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}
SpringJobScheduler继承自JobScheduler,容器启动的时候用调用schedule的init方法,jobSchedule和springJobScheduler过程如下:
接下取看jobSchedule。
public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
private final LiteJobConfiguration liteJobConfig;
private final CoordinatorRegistryCenter regCenter;
// TODO 为测试使用,测试用例不能反复new monitor service,以后需要把MonitorService重构为单例
@Getter
private final SchedulerFacade schedulerFacade;
private final JobFacade jobFacade;
jobSchedule中包含job的配置信息,注册中心,schedulerFacade,jobFacade。其中注册中心和配置中心信息来自上面的springSchedule。注意的是两个facade当中含有大量的配置接口。
在注册bean调用init方法之后,会完成quartz中job的创建。
public void init() {
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
jobSchedule在init中完成了以下操作:
elastic job将quartz的相关操作集成在**作业调度控制器【JobScheduleController】**中完成作业的启动,暂停等操作
创建调度
private Scheduler createScheduler() {
Scheduler result;
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
创建作业
private JobDetail createJobDetail(final String jobClass) {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
作业调度
public void scheduleJob(final String cron) {
try {
if (!scheduler.checkExists(jobDetail.getKey())) {
scheduler.scheduleJob(jobDetail, createTrigger(cron));
}
scheduler.start();
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
作业注册
elastic job在启动调度之前会将作业相关信息注册到zk上
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
_作业注册表【JobRegistry】_在zk上注册路径,并将jobScheduleController,regCenter缓存到map中,后面在任务发生变化的时候从map中取出相关信息
public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
schedulerMap.put(jobName, jobScheduleController);
regCenterMap.put(jobName, regCenter);
regCenter.addCacheData("/" + jobName);
}
注册启动信息
这个由schedulerFacade完成里面有相当多的东西。后面再说