HelloStranger

每个人都是初学者

基于Quartz的调度扩展实现总结

quartz

基于Quartz的调度扩展实现总结

1.对于JobDetail和Trigger的通俗理解

在Quartz框架中,一个任务(Job)的执行需要JobDetail和Trigger的同时支持:

JobDetail: JobDetail是对任务的描述,即解决了该任务要做什么,怎么做,用什么做的问题。”要做什么”和”怎么做”是我们根据业务逻辑自己实现的代码逻辑,而”用什么做”则是执行该Job的参数,依赖于框架之外在构建JobDetail时设置的参数。
Trigger:Trigger种类多样,他描述了这个任务的触发时间规则,即解决了该任务要什么时候做的问题。

根据特定的业务逻辑使用Quartz的过程,基本就是根据业务逻辑构建JobDetail和Trigger的过程。

2.关于JobDetail类中durability字段和shouldRecover字段的含义

下面是JavaDoc上的说明:

durability : 作业是否应在孤立后保留存储(没有触发器指向它)。即假如没有触发器指向该JobDeatil是否要持久化到对应的储存中去。

Whether or not the Job should remain stored after it is orphaned (no Triggers point to it).

shouldRecover : 如果遇到“恢复”或“故障转移”情况,则指示调度程序是否应重新执行作业。最简单的例子就是当Job正在执行时遇到系统崩溃,重启服务后是否要重新执行该Job。

Instructs the Scheduler whether or not the Job should be re-executed if a ‘recovery’ or ‘fail-over’ situation is encountered.

3.JobListener在服务重启后不生效

现象描述

将相关信息持久化到Mysql

org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX

创建了一个任务A并创建JobListener对象JobListenerA分配给任务A
服务重启后,任务A正常执行,但是JobListenerA不生效。

原因

在Quartz框架中,JobListener是不会被持久化的,只是被储存到JVM的内存中去,当服务重启时,原有的内存会被销毁,服务重启后是使用的新开辟的内存,并不包含上一次的内容,所以JobListener在服务重启后不会生生效。任务A会在重启之后继续执行,是因为已经将其相关信息持久化到了Mysql中。
Quartz官网上有以下一段描述:

Listeners are registered with the scheduler during run time, and are NOT stored in the JobStore along with the jobs and triggers. This is because listeners are typically an integration point with your application. Hence, each time your application runs, the listeners need to be re-registered with the scheduler.
中文:监听器在运行时在调度程序中注册,并且不与作业和触发器一起存储在JobStore中。这是因为侦听器通常是与您的应用程序的集成点。因此,每次运行(重启)应用程序时,都需要使用调度程序重新注册侦听器。

解决办法

该原因与问题4原因本质一致.解决办法请见

4.JobListener在集群模式(Cluster)下失效原因

现象描述

开启了集群部署模式

org.quartz.jobStore.isClustered: true

将相关信息持久化到Mysql

org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX

所有的JobListener在单实例状态下都是有效的,但是当系统部署到线上4个实例上的时候,就会出现一个任务运行完之后为其添加的JobListener不生效。

原因

在上述配置条件下,JobDetail对于集群内的所有实例都是共享可见的,但是对于JobListener任务监听器来说,集群之间的实例却没有共享,因为JobListener并没有持久化到Mysql中。所以很容易会出现下面一种情形:

假设集群模式下有实例A、B、C、D,我们在实例A上创建了一个JobDetail1以及触发器Trigger1,并创建了一个监听器JobListener1分配给了JobDetail1,最后提交到Quartz调度中去,此时Quartz会把实例A提交的JobDetail1和Trigger1持久化到Mysql中去,但是JobListener1没有持久化到Mysql而是储存到内存中,此时JobDetail1和Trigger1是在集群范围内每个实例都共享的,但是JobListener1却只是存储在实例A上,假如该任务被Quartz调度在别的实例(实例B、C、D)拉起,那么这个实例内存中并没有储存关于JobListener1的任务信息,在一开始除了实例A以外的实例就没有初始化该监听器,所以监听器也不会生效。

解决办法

该原因与问题3原因本质一致.解决办法请见

5.关于Job类的@PersistJobDataAfterExecution与@DisallowConcurrentExecution两个注解

官方释意

Job State and Concurrency
Now, some additional notes about a job’s state data (aka JobDataMap) and concurrency. There are a couple annotations that can be added to your Job class that affect Quartz’s behavior with respect to these aspects.

@DisallowConcurrentExecution is an annotation that can be added to the Job class that tells Quartz not to execute multiple instances of a given job definition (that refers to the given job class) concurrently.
Notice the wording there, as it was chosen very carefully. In the example from the previous section, if “SalesReportJob” has this annotation, than only one instance of “SalesReportForJoe” can execute at a given time, but it can execute concurrently with an instance of “SalesReportForMike”. The constraint is based upon an instance definition (JobDetail), not on instances of the job class. However, it was decided (during the design of Quartz) to have the annotation carried on the class itself, because it does often make a difference to how the class is coded.

@PersistJobDataAfterExecution is an annotation that can be added to the Job class that tells Quartz to update the stored copy of the JobDetail’s JobDataMap after the execute() method completes successfully (without throwing an exception), such that the next execution of the same job (JobDetail) receives the updated values rather than the originally stored values. Like the @DisallowConcurrentExecution annotation, this applies to a job definition instance, not a job class instance, though it was decided to have the job class carry the attribute because it does often make a difference to how the class is coded (e.g. the ‘statefulness’ will need to be explicitly ‘understood’ by the code within the execute method).

If you use the @PersistJobDataAfterExecution annotation, you should strongly consider also using the @DisallowConcurrentExecution annotation, in order to avoid possible confusion (race conditions) of what data was left stored when two instances of the same job (JobDetail) executed concurrently.

翻译如下:

作业状态和并发
现在,关于作业的状态数据(也称为JobDataMap)和并发性的一些附加说明。有几个注释可以添加到您的Job类中,这些注释会影响Quartz在这些方面的行为。

@DisallowConcurrentExecution是一个可以添加到Job类的注释,它告诉Quartz不要同时执行给定作业定义的多个实例(指向给定的作业类)。
注意那里的措辞,因为它是非常谨慎地选择的。在上一节的示例中,如果“SalesReportJob”具有此批注,则只能在给定时间执行“SalesReportForJoe”的一个实例,但它可以与“SalesReportForMike”实例同时执行。约束基于实例定义(JobDetail),而不是基于作业类的实例。然而,决定(在Quartz的设计期间)要对类本身进行注释,因为它通常会对类的编码方式产生影响。

@PersistJobDataAfterExecution是一个注释,可以添加到Job类,告诉Quartz在execute()方法成功完成后(不抛出异常)更新JobDetail的JobDataMap的存储副本,以便下一次执行相同的工作( JobDetail)接收更新的值而不是最初存储的值。与 @DisallowConcurrentExecution批注一样,这适用于作业定义实例,而不是作业类实例,尽管决定让作业类携带属性,因为它通常会对类的编码方式产生影响(例如’有状态’ ‘将需要由execute方法中的代码明确地’理解’)。

如果使用@PersistJobDataAfterExecution批注,则应强烈考虑使用 @DisallowConcurrentExecution批注,以避免在同时执行同一作业(JobDetail)的两个实例时可能存在的数据混乱(竞争条件)。

上面不要同时执行给定作业定义的多个实例的意思是说,对于不同的任务,有且最多只有一个对应的Job实例运行执行器逻辑,即对于一个唯一确定的JobKey,有且最多只有一个线程实例运行该JobKey对应的任务,与JobKey对应运行的JobDetail中JobDataMap是否与其他JobKey对应的JobDataMap是否相等无关。

6.关于Job出错的监控

新加JobListener继承下面的PipelineSingleJobListener类,实现抽象方法jobWasExecutedError,在该抽象方法的实现类里面编写相关的出错的逻辑即可。其中参数的意义如下:(int reFireCount, Map map, JobExecutionException jobException)

  • reFireCount:该任务的重试次数
  • map:执行该任务的参数Map
  • jobException:储存业务逻辑报错信息的实体,而且可以设置任务报错后对任务的处理动作,对于该类的详细解释请参见问题7

展开源码:PipelineSingleJobListener


public abstract class PipelineSingleJobListener extends JobListenerSupport {

private String name;

public PipelineSingleJobListener(String name) {
if (name == null) {
throw new IllegalArgumentException("Listener name cannot be null!");
} else {
this.name = name;
}
}

@Override
public String getName() {
return this.name;
}

/
(1) 任务执行之前执行
Called by the Scheduler when a JobDetail is about to be executed (an associated Trigger has occurred). /
public void jobToBeExecuted(JobExecutionContext context) {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
Map map = JobScheduleCreator.jobDataMapToMap(jobDataMap);
jobToBeExecuted(map);
context.getJobDetail().getJobDataMap().putAll(map);
}

/

(2) 这个方法正常情况下不执行,但是如果当TriggerListener中的vetoJobExecution方法返回true时,那么执行这个方法.
需要注意的是 如果方法(2)执行 那么(1),(3)这个俩个方法不会执行,因为任务被终止了. Called by the Scheduler when a JobDetail was about to be executed (an associated Trigger has occurred),
but a TriggerListener vetoed it's execution. /
public void jobExecutionVetoed(JobExecutionContext context) {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
Map map = JobScheduleCreator.jobDataMapToMap(jobDataMap);
jobExecutionVetoed(map);
context.getJobDetail().getJobDataMap().putAll(map);
}

/* (3)
任务执行完成后执行,jobException如果它不为空则说明任务在执行过程中出现了异常 Called by the Scheduler after a JobDetail has been executed, and be for the associated Trigger's triggered(xx) method has been called.
*/
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
Map map = JobScheduleCreator.jobDataMapToMap(jobDataMap);
if (jobException == null) {
// 程序正常执行
jobWasExecuted(map);
} else {
// 程序运行出错
int reFireCount = context.getRefireCount();
jobWasExecutedError(reFireCount, map, jobException);
}
context.getJobDetail().getJobDataMap().putAll(map);
}

protected abstract void jobToBeExecuted(Map map);

protected abstract void jobExecutionVetoed(Map map);

protected abstract void jobWasExecuted(Map map);

protected abstract void jobWasExecutedError(int reFireCount, Map map, JobExecutionException jobException);
}

7.关于对JobExecutionException类的使用

JavaDoc里面对该类的作用有如下解释:

An exception that can be thrown by a Job to indicate to the Quartz Scheduler that an error occurred while executing, and whether or not the Job requests to be re-fired immediately (using the same JobExecutionContext, or whether it wants to be unscheduled.

Note that if the flag for ‘refire immediately’ is set, the flags for unscheduling the Job are ignored.

通俗的来说就是,当我们自己实现的Job类的内部逻辑出错时(此时原因是我们自己实现的逻辑问题),可以抛出异常,此时Quartz调度程序会捕获该部分异常,并提供Job出现异常后的处理办法,比如当我们知道Job在执行过程中出错之后,是否要设置重跑任务,是否要停止调度一切关于该任务的触发器,值得注意的是,假如说我们设置了立即触发,停止调度的标签(类的内部成员变量)将会被忽略。

8.SpringBoot与Quartz结合使用的过程中,自动将Job实例注册为Spring Bean

  1. 新建SpringBeanJobFactory的子类并重写setApplicationContexcreateJobInstance方法

    展开源码:SchedulerJobFactory


    public class SchedulerJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {

    private AutowireCapableBeanFactory beanFactory;

    public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException {
    beanFactory = applicationContext.getAutowireCapableBeanFactory();
    }

    protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
    final Object job = super.createJobInstance(bundle);
    beanFactory.autowireBean(job);
    return job;
    }
    }

  2. 使用SchedulerFactoryBean类作为Quartz使用的Bean

9.在集群模式下,关于父子任务(任务依赖关系)的思考

倘若在单实例的模式下,有关父子任务的事情可以轻松的使用JobChainingJobListener类来完成这一需求,使用这个监听器只需要使用方法:

addJobChainLink(JobKey firstJob, JobKey secondJob)

但是由于监听器不会被持久化下来,所以在集群模式下的父子任务的实现需要思考。解决集群模式下的父子任务问题从本质上来说是解决”父子任务依赖关系如何持久化的问题”。关于对应的解决方法,有以下两种思考:

  1. 将父子任务的依赖关系持久化到Mysql中,使其对集群可见

关于以上利弊,分开来说:

持久化到Mysql中

将父子任务的依赖关系持久化到Mysql中,使其依赖关系对全局可见,可以通过如下步骤实现。

  1. 初始化一张Mysql表,主要记录了父子任务对的依赖关系以及使能状态等
  2. Coding一套针对于依赖关系表的CRUD接口服务
  3. 定义全局的监听器,其作用在于在每个任务完成后都去父子依赖关系表中查看是否有相关子任务依赖,假如有子任务依赖,则拉起他。

优势:将父子依赖解耦到其他逻辑功能中,便与操控,代码入侵性小,父子任务查询不依赖于Job数据
劣势:影响范围过大,所有任务的执行都会伴随着Mysql的读写操作

持久化到任务数据中

大致步骤如下:

  1. 定义Quartz Job数据中预留两个字段,FATHER_JOBS/CHILDREN_JOBS
  2. 当添加一组父子任务依赖时,需要将父子两个任务的FATHER_JOBS/CHILDREN_JOBS字段信息保存,此时要求父子任务必须开启PersistJobDataAfterExecution注解
  3. 定义全局监听器,作用同上一种方法原理一样

优势:不依赖于其他持久化表,父子任务的依赖查询不依赖于Mysql,性能相对方法一较快
劣势:代码入侵性大,父子任务的依赖添加要直接操作对应的Job类,风险较高(当一个Job正在运行,我们却企图修改Job中的数据),数据耦合严重

最后,建议使用方法1 持久化到Mysql中

10.非Service层获取Spring中的Bean

以前写过相似的功能的文章:SpringMVC中,非controller层 调用服务层(Service)函数,即获得Spring管理的Bean通用方法 ,但下面的方式更加便捷

新建SpringBeanGetter类并实现ApplicationContextAware

展开源码:SpringBeanGetter


@Component
public class SpringBeanGetter implements ApplicationContextAware {
private static ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
synchronized (this) {
if (SpringBeanGetter.applicationContext == null) {
SpringBeanGetter.applicationContext = applicationContext;
}
}
}

//获取applicationContext
public static ApplicationContext getApplicationContext() {
return applicationContext;
}

//通过name获取 Bean.
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}

//通过class获取Bean.
public static T getBean(Class clazz) {
return getApplicationContext().getBean(clazz);
}

//通过name,以及Clazz返回指定的Bean
public static T getBean(String name, Class clazz) {
return getApplicationContext().getBean(name, clazz);
}
}

11.XStream使用

12.CountDownLatch的使用场景以及使用

点赞
  1. sunny说道:

    请问下JobListener在集群环境下的问题你是怎么解决的,谢谢!

发表评论