手记

elastic job原理解析-JobSchedule

elastic job原理解析-JobSchedule

jobschedule完成了很多和quartz相关的内容,并且也会将服务信息注册到zk上,并且做好相关的job信息的cache。在整个任务的调度中,jobSchedule完成了elastic job主干流程。

JobSchedule

elastic job在启动的时候需要注册bean,spring容器启动调用init的方法,完成初始化配置信息。具体代码如下

    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron, @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
        return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
    }

SpringJobScheduler继承自JobScheduler,容器启动的时候用调用schedule的init方法,
jobSchedule和springJobScheduler之间关系如下:

jobschedule作为父类完成了90%的springJobSchedlue的工作,SpringJobScheduler仅仅完成获取listener的行为然后调用父类构造函数,这个行为本身不是自己需要,而是父类构造函数中需要。可以认为SpringJobScheduler 存在就是为了调用JobScheduler的构造器而准备一些的config。

接下去看jobSchedule中关于quartz的一些核心代码。

public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
    
    private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
    
    private final LiteJobConfiguration liteJobConfig;
    
    private final CoordinatorRegistryCenter regCenter;
    @Getter
    private final SchedulerFacade schedulerFacade;
    
    private final JobFacade jobFacade;

jobSchedule中包含job的配置信息,注册中心,schedulerFacade,jobFacade。其中注册中心和配置中心信息来自上面的springSchedule。注意的是两个facade当中含有大量的配置接口。

在注册bean调用init方法之后,会完成quartz中job的创建。

    public void init() {
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }

jobSchedule在init中完成了以下操作:

elastic job将quartz的相关操作集成在 作业调度控制器【JobScheduleController】 中完成作业的启动,暂停等操作

创建调度

private Scheduler createScheduler() {
        Scheduler result;
        try {
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getBaseQuartzProperties());
            result = factory.getScheduler();
            result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
        return result;
    }

创建quartz中SchedulerFactory

创建作业

private JobDetail createJobDetail(final String jobClass) {
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }

作业调度

public void scheduleJob(final String cron) {
        try {
            if (!scheduler.checkExists(jobDetail.getKey())) {
                scheduler.scheduleJob(jobDetail, createTrigger(cron));
            }
            scheduler.start();
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }

jobSchedule在init中调用了 作业调度控制器【JobScheduleController】 的scheduleJob,到这里elsatic job完成了作业的调度。

作业注册

elastic job不会仅仅满足于quartz的启动,他会在启动调度之前会将作业相关信息注册到zk上

JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);

作业注册表【JobRegistry】 在zk上注册路径,并将jobScheduleController,regCenter缓存到map中,后面在任务发生变化的时候从map中取出相关信息

public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
        schedulerMap.put(jobName, jobScheduleController);
        regCenterMap.put(jobName, regCenter);
        regCenter.addCacheData("/" + jobName);
    }

注册启动信息

这个有 schedulerFacade 完成,里面有相当多的东西。下一章节单独描述。

0人推荐
随时随地看视频
慕课网APP