SpringBoot 整合 Quartz 实现分布式定时任务

1. Quartz 简介

  • Quartz is a richly featured, open source job scheduling library that can be integrated within virtually any Java application - from the smallest stand-alone application to the largest e-commerce system. Quartz can be used to create simple or complex schedules for executing tens, hundreds, or even tens-of-thousands of jobs; jobs whose tasks are defined as standard Java components that may execute virtually anything you may program them to do. The Quartz Scheduler includes many enterprise-class features, such as support for JTA transactions and clustering.
  • 特点:
    • 支持分布式高可用:在多节点情况下,如果使用 @Scheduled 等方式会造成所有节点都执行任务
    • 支持持久化:可将任务存储到 MySQL 等
    • 支持多任务调度和管理:可以同时调度多个任务,可以实现任务的增删改查等管理
  • 组成:
    • JobDetail:任务详情
    • Trigger:触发器,指定任务执行的时间
    • Scheduler:调度器,用来调度、暂停和删除任务,可以注册多个 JobDetail 和 Trigger

2. SpringBoot 整合 Quartz

  • 添加 Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>2.6.7</version>
</dependency>
  • 配置 Quartz
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
spring:
quartz:
job-store-type: jdbc # 默认为内存memory的方式,这里使用数据库的形式
wait-for-jobs-to-complete-on-shutdown: true # 关闭时等待任务完成
overwrite-existing-jobs: true # 可以覆盖已有的任务
jdbc:
initialize-schema: never # 是否自动使用SQL初始化Quartz表结构
properties: # quartz原生配置
org:
quartz:
# 调度器属性
scheduler:
instanceName: MyScheduler # 调度器实例名称
instanceId: AUTO # 调度器实例ID自动生成
# JobStore相关配置
jobStore:
class: org.springframework.scheduling.quartz.LocalDataSourceJobStore # JobStore实现类
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate # 使用完全兼容JDBC的驱动
tablePrefix: QRTZ_ # Quartz表前缀
useProperties: false # 是否将JobDataMap中的属性转为字符串存储
# 线程池相关配置
threadPool:
threadCount: 5 # 线程池大小。默认为10
threadPriority: 5 # 线程优先级
class: org.quartz.simpl.SimpleThreadPool # 指定线程池实现类,对调度器提供固定大小的线程池

3. 基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
JobDetail jobDetail = JobBuilder
.newJob(MyJob.class) // 执行什么任务
.withIdentity("myJobName", "myJobGroup") // 唯一标识
.usingJobData("key1", "value1") // 给任务传参
.storeDurably() // 没Trigger指向它时是否还存在
.build();

Trigger simpleTrigger = TriggerBuilder.newTrigger()
.withIdentity("myTriggerName", "myTriggerGroup") // 唯一标识
.withSchedule(SimpleScheduleBuilder
.repeatSecondlyForever(1)
.withIntervalInSeconds(0)
.withRepeatCount(0))
.usingJobData("key1", "value1")
.startNow()
.build();

Trigger cronTrigger = TriggerBuilder.newTrigger()
.withIdentity("myTriggerName", "myTriggerGroup") // 唯一标识
.withSchedule(CronScheduleBuilder
.cronSchedule("0 0 * * * ?"))
.usingJobData("key1", "value1")
.startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
.startNow()
.build();

Scheduler schedulerStd = StdSchedulerFactory.getDefaultScheduler(); // 常用
Scheduler schedulerDir = DirectSchedulerFactory.getInstance().getScheduler();

schedulerStd.scheduleJob(jobDetail, cronTrigger);
schedulerStd.start();

4. 应用启动时自动调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class QuartzConfig {

@Bean
public JobDetail scheduleJobDetail() {
return JobBuilder.newJob(MyJob.class)
.withIdentity("schedulerJob")
.storeDurably()
.build();
}

@Bean
public Trigger scheduleJobDetailTrigger() {
return TriggerBuilder
.newTrigger()
.forJob(scheduleJobDetail())
.withIdentity("schedulerJob")
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0))
.startNow()
.build();
}
}

5. 接口方式动态调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface QuartzService {

/**
* @param jobTime 任务时间间隔(秒)
* @param repeatCount 任务运行次数(若<0,则不限次数)
*/
void addSimpleJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName,
int jobTime, int repeatCount, Map<String, Object> jobData);

void addCronJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName,
String cron, Map<String, Object> jobData);

void updateCronJob(String jobName, String jobGroupName, String cron);

void deleteJob(String jobName, String jobGroupName);

void pauseJob(String jobName, String jobGroupName);

void resumeJob(String jobName, String jobGroupName);

void runJobNow(String jobName, String jobGroupName);

List<Map<String, Object>> queryAllJob();

List<Map<String, Object>> queryRunningJob();
}
  • 实现类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
@Slf4j
@Service
public class QuartzServiceImpl implements QuartzService {

@Autowired
private Scheduler scheduler;

@PostConstruct
public void startScheduler() {
try {
scheduler.start();
} catch (SchedulerException e) {
e.printStackTrace();
}
}

@Override
public void addSimpleJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName,
int jobTime, int repeatCount, Map<String, Object> jobData) {
try {
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(jobName, jobGroupName)
.build();
if (jobData != null && jobData.size() > 0) {
jobDetail.getJobDataMap().putAll(jobData);
}
SimpleScheduleBuilder scheduleBuilder;
if (repeatCount < 0) {
scheduleBuilder = SimpleScheduleBuilder
.repeatSecondlyForever(1)
.withIntervalInSeconds(jobTime);
} else {
scheduleBuilder = SimpleScheduleBuilder
.repeatSecondlyForever(1)
.withIntervalInSeconds(jobTime)
.withRepeatCount(repeatCount);
}
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobName, jobGroupName)
.withSchedule(scheduleBuilder)
.startNow().build();
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
throw new RuntimeException("add simple job error!");
}
}

@Override
public void addCronJob(Class<? extends QuartzJobBean> jobClass, String jobName,
String jobGroupName, String cron, Map<String, Object> jobData) {
try {
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(jobName, jobGroupName)
.build();
if (jobData != null && jobData.size() > 0) {
jobDetail.getJobDataMap().putAll(jobData);
}
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobName, jobGroupName)
.startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND))
.withSchedule(CronScheduleBuilder.cronSchedule(cron))
.startNow()
.build();
scheduler.scheduleJob(jobDetail, trigger);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("add cron job error");
}
}

@Override
public void updateCronJob(String jobName, String jobGroupName, String cron) {
try {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
trigger = trigger.getTriggerBuilder()
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule(cron))
.build();
// 重启触发器
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
throw new RuntimeException("update job error");
}
}

@Override
public void deleteJob(String jobName, String jobGroupName) {
try {
scheduler.deleteJob(new JobKey(jobName, jobGroupName));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("delete job error");
}
}

@Override
public void pauseJob(String jobName, String jobGroupName) {
try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
scheduler.pauseJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
throw new RuntimeException("pause job error");
}
}

@Override
public void resumeJob(String jobName, String jobGroupName) {
try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
scheduler.resumeJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
throw new RuntimeException("resume job error");
}
}

@Override
public void runJobNow(String jobName, String jobGroupName) {
try {
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
scheduler.triggerJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
throw new RuntimeException("run job error");
}
}

@Override
public List<Map<String, Object>> queryAllJob() {
List<Map<String, Object>> jobList;
try {
Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.anyJobGroup());
jobList = new ArrayList<>();
for (JobKey jobKey : jobKeys) {
log.info("maps: {}", scheduler.getJobDetail(jobKey).getJobDataMap().getWrappedMap());
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
Map<String, Object> map = new HashMap<>();
map.put("jobName", jobKey.getName());
map.put("jobGroupName", jobKey.getGroup());
map.put("description", "触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
map.put("jobStatus", triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
map.put("jobTime", cronExpression);
}
jobList.add(map);
}
}
} catch (SchedulerException e) {
e.printStackTrace();
throw new RuntimeException("query all jobs error");
}
return jobList;
}

@Override
public List<Map<String, Object>> queryRunningJob() {
List<Map<String, Object>> jobList;
try {
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
jobList = new ArrayList<>(executingJobs.size());
for (JobExecutionContext executingJob : executingJobs) {
Map<String, Object> map = new HashMap<>();
JobDetail jobDetail = executingJob.getJobDetail();
JobKey jobKey = jobDetail.getKey();
Trigger trigger = executingJob.getTrigger();
map.put("jobName", jobKey.getName());
map.put("jobGroupName", jobKey.getGroup());
map.put("description", "触发器:" + trigger.getKey());
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
map.put("jobStatus", triggerState.name());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
map.put("jobTime", cronExpression);
}
jobList.add(map);
}
} catch (SchedulerException e) {
e.printStackTrace();
throw new RuntimeException("query run jobs error");
}
return jobList;
}
}

参考