small_925_ant
2019-01-07 12:59:26浏览 6084
上一篇介绍了整个调度系统的流程,今天先来介绍一下Quartz
简单使用
集群模式锁的使用
整合springboot
简单使用
private static void min() throws ParseException {
String time="2018-12-19 15:22:50";
SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
.startAt(simpleDateFormat.parse(time))
.withSchedule(calendarIntervalSchedule()
.withIntervalInMinutes(1)
.withMisfireHandlingInstructionDoNothing()
)
.build();
JobDetail job = newJob(HelloQuartz.class)
.withIdentity("job1", "group1")
.usingJobData("name", "quartz")
.build();
scheduler.scheduleJob(job, trigger);
scheduler.start();
} catch (Exception e) {
e.printStackTrace();
}
}
Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
.startNow()
.withSchedule(calendarIntervalSchedule()
.withIntervalInDays(1))
.build();
Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
.startAt(simpleDateFormat.parse(time))
.withSchedule(simpleSchedule()
.withRepeatCount(0)
.repeatForever()
.withIntervalInSeconds(1)
.withMisfireHandlingInstructionFireNow()
)
.build();
Trigger trigger = newTrigger().withIdentity("trigger1", "group1")
.startNow()
.withSchedule(calendarIntervalSchedule()
.withIntervalInHours(1))
.build();
@DisallowConcurrentExecution
public class HelloQuartz implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDetail detail = context.getJobDetail();
String name = detail.getJobDataMap().getString("name");
SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("==============>测试 say hello to " + name + " at " + simpleDateFormat.format(new Date()).toString());
}
}
配置
org.quartz.scheduler.instanceName: FayaQuartzScheduler
#调度器实例编号自动生成
org.quartz.scheduler.instanceId = AUTO
# 持久化配置
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
#quartz相关数据表前缀名
org.quartz.jobStore.tablePrefix = QRTZ_
#开启分布式部署
org.quartz.jobStore.isClustered = true
#分布式节点有效性检查时间间隔,单位:毫秒
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
#线程池实现类
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
锁的简单介绍
Quartz使用锁来实现数据库集群
可以看到quartz提供的表
CREATE TABLE `qrtz_locks` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_unicode_ci NOT NULL,
`LOCK_NAME` varchar(40) COLLATE utf8mb4_unicode_ci NOT NULL,
PRIMARY KEY (`SCHED_NAME`,`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
行级锁:每次操作锁住一行数据。开销大,加锁慢;会出现死锁;锁定粒度最小,发生锁冲突的概率最低,并发度也最高
默认情况下,select语句是不会对数据加写锁的,也就是不会阻止写入,通过使用 for update (属于悲观锁)可以对数据加写锁.
Quartz获取锁执行流程
QuartzScheduler被创建时候,会创建一个QuartzSchedulerThread实例,作为调度线程
会去获取未来30s内将会被触发的trigger
triggers = this.qsRsrcs.getJobStore().acquireNextTriggers(now + this.idleWaitTime, Math.min(availThreadCount, this.qsRsrcs.getMaxBatchSize()), this.qsRsrcs.getBatchTimeWindow());
此时会先去获取TRIGGER_ACCESS相关的锁
获取连接
conn = DBConnectionManager.getInstance().getConnection(this.getDataSource());
//关闭mysql数据库的自动提交属性
if (!this.isDontSetAutoCommitFalse()) {
conn.setAutoCommit(false);
}
会执行SQL
public static final String SELECT_FOR_LOCK = "SELECT * FROM {0}LOCKS WHERE SCHED_NAME = {1} AND LOCK_NAME = ? FOR UPDATE";
可以看到sql使用的 FOR UPDATE,会锁住查询数据的行。
其他线程执行此操作会被阻塞,直到上一个获取锁的操作提交后。
更多数据库锁和quartz的细节大家可以去查看更多的资料,也可以一起讨论
springboot 整合quartz
@EnableScheduling
@Configuration
public class QuartzConfiguration {
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext)
{
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory,@Qualifier("dataSource") DataSource dataSource){
SchedulerFactoryBean schedulerFactoryBean=new SchedulerFactoryBean();
schedulerFactoryBean.setJobFactory(jobFactory);
schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties"));
schedulerFactoryBean.setOverwriteExistingJobs(true);
schedulerFactoryBean.setStartupDelay(2);
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.setDataSource(dataSource);
return schedulerFactoryBean;
}
public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements
ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
return job;
}
}
}
@Autowired
private Scheduler scheduler;
总结
完整代码可以去github上查看:https://github.com/lizu18xz/faya-job
热门评论
啦啦啦啦hhhhjjj