任务管理工具spring-fom 设计文档

words: 5.7k    views:    time: 23min

写这个工具最开始只是为了解决各种离线数据文件的处理,其中涉及比较多的任务处理。当时刚进入公司参与第一个项目,但是意识到公司的产品业务基本上都是围绕着数据做一些信息挖掘,尤其是离线数据,所以后续应该还会有很多各种各样的任务式场景。后来参与的各种项目也确实印证了自己的想法,所以确实能在一些项目中发挥作用,提高工作中的开发效率和质量,以及运维的能力。

目标是实现一个轻量级的通用任务管理框架,主要针对的是定时任务。在功能上除了能支持常见的场景外,希望也能支持一定的实时监控管理,而在结构设计上要有足够的扩展性以方便进行定制,同时在使用方式上要简洁清晰,对原代码尽量少的侵入。由于当时是设计用来处理文件的,又是基于spring容器管理的,所以取名为spring-fom。

https://github.com/shanhm1991/spring-fom

1. 思路

主要的思路是基于线程池来实现,将任务的调度逻辑封装在ScheduleContext中。其内部维护了一个调度线程和一个线程池,以及一套自定义的状态转换机制。每个ScheduleContext就相当于一个独立的任务调度器,其生命周期,比如加载、启动、终止,则委托给spring 的应用上下文。对于具体的任务则抽象为 Task,即在 Callable 的基础上定义的一套任务执行模板。所以,整个spring-fom的功能,基本就是围绕 ScheduleContextTask 来实现的。

下面主要从定时任务的角度进行说明,当然如果不是定时任务,也可以将其简单作为一个线程池来提交任务使用。区别只是在于任务的创建和提交方式不同,前者由内部调度线程负责创建和提交,而后者来自外部线程,至于任务的执行则统一委托给内部线程池。

1.1. 功能设计

主要特性:

1. 支持常用的定时策略:cron | fixedRate | fixedDelay
2. 支持批任务
    实现接口 ScheduleFactory 可以创建批任务,通过 @Scheduled 可以指定多个任务方法;
    实现接口 CompleteHandler 可以自定义批任务结束处理
3. 支持任务超时检测
    通过 taskOverTime 可以设置任务超时时间,通过 detectTimeoutOnEachTask 可以设置是对整体还是单个任务计算超时;
    实现接口 TaskCancelHandler 可以自定义任务超时的取消处理,默认进行 Interrupt 中断;
4. 支持任务冲突检测
    每个任务都有一个id,开启 enableTaskConflict 后如果提交任务时,发现已经存在对应id的任务,并且还在运行,则忽略本次任务;
5. 支持实时监控管理:http://[ip]:[port]/[context-path]/fom
    5.1. 可以实时查看定时器状态、任务线程堆栈、以及执行统计等信息
    5.2. 可以实时启动 / 终止定时器
           实现接口 TaskCancelHandler 可以自定义定时器终止时任务的取消处理,默认通过 Interrupt 中断
           实现接口 TerminateHandler 可以自定义定时器终止时的处理,比如清理释放资源
    5.3. 可以实时触发执行任务,比如当定时任务未到执行时机时,可以手动触发使其立即执行
           实时触发任务支持两种策略:如果任务正在执行则直接忽略(默认),或者等待本轮执行完成后立即再重新执行
    5.4. 可以实时修改任务配置,支持注入属性,也做了本地持久化,这样重启后修改不会丢失

1.2. 配置

对应上面的功能,下面列举一下相关的配置:

任务配置:@Fom 默认值: 说明: 生效:
cron null cron计划 下轮任务执行
fixedRate 0 距离上次任务开始的时间(毫秒) 下轮任务执行
fixedDelay 0 距离上次任务结束的时间(毫秒) 下轮任务执行
enable true 是否在加载时启动(startup) 任务重启
execOnLoad false 是否在启动后立即执行 任务重启
initialDelay 0 任务在启动后首次执行的延时时间(毫秒) 下轮任务执行
deadTime 0 任务截止时间,超过截止时间后任务不再执行 立即生效
threadCore 1 核心线程数,如果设置时core > max,则将max置为core 立即生效
threadMax 1 最大线程数,如果设置时max < core,则将core置为max 立即生效
threadAliveTime 10 线程最大空闲时间(毫秒) 立即生效
queueSize 256 任务队列长度 任务重启
taskOverTime 0 任务超时时间(毫秒) 下轮任务执行
enableTaskConflict false 是否检测任务冲突(根据任务id) 任务重启
detectTimeoutOnEachTask true 是否针对每个任务单独检测超时 下轮任务执行
ignoreExecRequestWhenRunning true Running状态时,是否忽略触发执行请求 立即生效
remark null 备注说明 立即生效
启动开关:@EnableFom 默认值: 说明:
enableFomView true 是否启用管理界面
环境变量: 默认值: 说明:
spring.fom.cache.type file 配置持久化方式,暂时只有本地文件保存
spring.fom.cache.history false 是否保存历史配置修改,默认只保存最新配置
spring.fom.cache.file.path cache/schedule 保存文件路径

1.3. 状态定义

对于ScheduleContext中维护的状态,其转换机制可以如下图表示:

  • 如果处于状态:initedstopped

可以接收外部信号start,然后启动调度线程,并将状态切换为 running

  • 如果处于状态:running

正常情况下,调度线程会等待任务结束后将状态切换为 sleeping,如果没有设置定时计划(一次性任务),则切换为 inited;

可以接收外部信号stop,由外部线程将状态切换为 stopping,并请求中断调度线程。然后调度线程会跳过等待任务结束的过程,并及时检测到 stopping 状态,接着尝试取消还在执行的任务,关闭线程池,并在所有任务真正结束后将状态切换为 stopped;

可以接收外部信号run,默认会被忽略,如果将ignoreExecRequestWhenRunning设置为false,那么将在本轮任务结束之后会立即再重新执行;

  • 如果处于状态:sleeping

正常情况下,调度线程会处于sleep状态直到下一轮执行时机,将状态切换为 running;

可以接收外部信号run,由外部线程中断调度线程的等待,然后调度线程立即开始下一轮任务执行,并将状态切换为 running;

可以接收外部信号stop,也是由外部线程中断调度线程的等待,但在这之前,外部线程会将状态切换为 stopping,然后调度线程在开始下一轮任务之前会检测到这个 stopping 状态,接着会去关闭线程池,并将状态切换为 stopped;

  • 如果处于状态:stopping

此时忽略一切外部信号,直到调度线程等待所有任务结束之后,由调度线程将状态切换为 stopped;

1.4. 调度线程

对于ScheduleContext中的调度线程,根据上面的状态描述,可以大概画出其执行流程如下:

2. 具体实现

2.1. 加载启动

  • 加载

任务的加载和启动完全委托给spring应用上下文,首先定义一个注解@Fom继承@Component,这样spring容器就会帮忙加载所有@Fom

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/annotation/Fom.java
1
2
3
4
5
6
7
8
9
10
11
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Fom {

@AliasFor(annotation = Component.class)
String value() default "";

// ... ...
}

通过接口ImportBeanDefinitionRegistrar可以拿到spring容器中所有的 BeanDefinition,然后过滤出那些标识了@Fom的类,并注册一个对应的ScheduleContext定义,至于beanName,就在原目标类的beanName之前加一个$

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/FomBeanDefinitionRegistrar.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class FomBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar{

private static AtomicBoolean registed = new AtomicBoolean(false);

@Override
public void registerBeanDefinitions(AnnotationMetadata meta, BeanDefinitionRegistry registry) {
if(!registed.compareAndSet(false, true)){
return;
}

AnnotationAttributes attrs = AnnotationAttributes.fromMap(meta.getAnnotationAttributes(EnableFom.class.getName()));
if((boolean)attrs.get("enableFomView")){
// FomController
RootBeanDefinition fomController = new RootBeanDefinition(FomController.class);
registry.registerBeanDefinition("fomController", fomController);

// FomService
RootBeanDefinition fomService = new RootBeanDefinition(FomServiceImpl.class);
registry.registerBeanDefinition("fomService", fomService);
}

// FomBeanPostProcessor
RootBeanDefinition fomBeanPostProcessor = new RootBeanDefinition(FomBeanPostProcessor.class);
registry.registerBeanDefinition("fomBeanPostProcessor", fomBeanPostProcessor);

// FomStarter
RootBeanDefinition fomStarter = new RootBeanDefinition(FomStarter.class);
registry.registerBeanDefinition("fomStarter", fomStarter);

// FomExternalRegister
RootBeanDefinition fomExternalRegister = new RootBeanDefinition(FomExternalRegister.class);
registry.registerBeanDefinition("fomExternalRegister", fomExternalRegister);

// 注册FomBeanDefinition
String[] beanNames = registry.getBeanDefinitionNames();
Class<?> clazz;
for(String beanName : beanNames){
BeanDefinition beanDefinition = registry.getBeanDefinition(beanName);
String className = beanDefinition.getBeanClassName();
if(className != null){
try {
clazz = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new ApplicationContextException("", e);
}
FomSchedule fomSchedule = clazz.getAnnotation(FomSchedule.class);
if(fomSchedule != null){
parseFomSchedule(beanName, clazz, beanDefinition, fomSchedule, registry);
}
}
}
}

public void parseFomSchedule(String beanName, Class<?> clazz, BeanDefinition beanDefinition, FomSchedule fomSchedule, BeanDefinitionRegistry registry){
if(ScheduleContext.class.isAssignableFrom(clazz)){
beanDefinition.getPropertyValues().add("scheduleName", beanName);
registry.registerAlias(beanName, "$" + beanName);
}else{
RootBeanDefinition fomBeanDefinition = new RootBeanDefinition(ScheduleContext.class);
fomBeanDefinition.getPropertyValues().add("scheduleBeanName", beanName);
fomBeanDefinition.getPropertyValues().add("scheduleName", "$" + beanName);
registry.registerBeanDefinition("$" + beanName, fomBeanDefinition);
}
}
}

通过继承spring的注解Import来触发ImportBeanDefinitionRegistrar,并且可以在实现方法中获取到注解中定义的属性

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/annotation/EnableFom.java
1
2
3
4
5
6
7
8
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(FomBeanDefinitionRegistrar.class)
public @interface EnableFom {

boolean enableFomView() default true;
}

以上已经完成了对所有ScheduleContext实例的加载,但我们真正希望执行的是标识了Fom的目标类中的方法。这里可以通过动态代理,也就是创建一个代理对象来改变ScheduleContext中一些方法的执行过程,并使用这个代理对象来代替ScheduleContext实例注册到spring容器中去,具体在BeanPostProcessor中进行处理即可。

上面在注册BeanDefinition时注入了一个属性scheduleBeanName,也是为了用来记住代理的目标实例,方便获取其Fom注解信息,具体的代理实现在ScheduleProxy中,细节有些繁琐,这里也不过多说明

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/FomBeanPostProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class FomBeanPostProcessor implements BeanPostProcessor, BeanFactoryAware, EmbeddedValueResolverAware {

// ... ...

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> clazz = bean.getClass();
if(!(ScheduleContext.class.isAssignableFrom(clazz))){
return bean;
}

ScheduleContext<?> scheduleContext = (ScheduleContext<?>)bean;
if(scheduleContext.isExternal()) {
return processExternal(scheduleContext, beanName);
}

String scheduleBeanName = scheduleContext.getScheduleBeanName();
FomSchedule fomSchedule = scheduleContext.getClass().getAnnotation(FomSchedule.class);

// @Bean不处理
if(StringUtils.isEmpty(scheduleBeanName) && fomSchedule == null){
scheduleContext.setScheduleName(beanName);
scheduleContext.setLogger(LoggerFactory.getLogger(scheduleContext.getClass()));
return bean;
}

Object scheduleBean = null;
if(!StringUtils.isEmpty(scheduleBeanName)){
scheduleBean = beanFactory.getBean(scheduleBeanName);
}

// 设置Logger
fomSchedule = clazz.getAnnotation(FomSchedule.class);
if(fomSchedule == null){
fomSchedule = scheduleBean.getClass().getAnnotation(FomSchedule.class);
scheduleContext.setLogger(LoggerFactory.getLogger(scheduleBean.getClass()));
}else{
scheduleContext.setLogger(LoggerFactory.getLogger(clazz));
}

// 加载配置
ScheduleConfig scheduleConfig = scheduleContext.getScheduleConfig();
if(fomSchedule != null){ // 注解
setCronConf(scheduleConfig, fomSchedule, scheduleContext, scheduleBean);
setOtherConf(scheduleConfig, fomSchedule);
setValue(scheduleConfig, scheduleContext, scheduleBean);
}else{ // xml配置 TODO

}

// 加载缓存配置(修改内容持久化,覆盖上面加载的配置)
try {
loadCache(beanName, scheduleContext);
} catch (Exception e) {
throw new ApplicationContextException("", e);
}

// 刷新配置
scheduleConfig.refresh();

// 创建代理 注册容器
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(clazz);
enhancer.setCallback(new ScheduleProxy(beanName, scheduleContext, fomSchedule, scheduleBean));
return enhancer.create();
}
}
  • 启动

相比于加载过程,启动就非常简单了,直接实现接口Lifecycle委托给spring

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/FomStarter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class FomStarter implements SmartLifecycle, ApplicationContextAware {

// ... ...

@SuppressWarnings("rawtypes")
@Override
public void start() {
String[] scheduleNames = applicationContext.getBeanNamesForType(ScheduleContext.class);
for(String scheduleName : scheduleNames){
ScheduleContext<?> schedule = (ScheduleContext)applicationContext.getBean(scheduleName);
ScheduleConfig config = schedule.getScheduleConfig();
if(config.getBoolean(FomSchedule.ENABLE, true)){
schedule.scheduleStart();
logger.info("load and start schedule[{}]: {}", scheduleName, schedule.getScheduleConfig().getConfMap());
}else{
logger.info("load schedule[{}]: {}", scheduleName, schedule.getScheduleConfig().getConfMap());
}
}
}

@SuppressWarnings("rawtypes")
@Override
public void stop() {
String[] scheduleNames = applicationContext.getBeanNamesForType(ScheduleContext.class);
for(String scheduleName : scheduleNames){
ScheduleContext<?> schedule = (ScheduleContext)applicationContext.getBean(scheduleName);
schedule.scheduleStop();
}
}
}

2.2. 任务执行

ScheduleContext中维护了一个私有的线程池,所有具体任务的执行都委托给了这个线程池,上面所说的状态基本上也是线程池对应的状态,调度线程只需要负责任务的创建和提交。那么问题来了,如何检测提交的任务在什么时候执行完成,尤其是提交了多个任务之后。

简单想法是由调度线程在提交任务时记录一个任务数,然后由工作线程在任务结束时将计数减1,如果减完后计数为0,则表示任务全部完成。

但在实现中,发现在提交时这个任务的实际提交数不好直接确定,比如我们想检测任务冲突,那么只能进行遍历提交,每个任务在提交时检测其id,如果已经存在则直接跳过,于是就在每个任务实际提交成功后将任务计数加1。(当然也可以对整个提交过程上锁,以便获得一个最终提交数,但这样锁的持有时间相对就变长了,而且也没能直接利用ConcurrentMap提供的线程安全特性)

由于调度线程在提交任务将计数加1的同时,工作线程也在执行任务并将计数减1,于是问题就变复杂了,要考虑的不仅仅是计数变量的线程安全问题,如果任务执行得足够快,可能出现在提交过程中计数被减为0,导致工作线程误以为任务已经全部完成,但实际上还有任务没提交的场景。

所以要保证在检测任务是否全部完成时,提交任务这个动作已经结束。这里可以添加一个标识,由调度线程在提交结束后将其置为真,然后工作线程在任务结束并将计数减1后,除了要检测任务计数是否为0,还要再检测下这个标识,都为真才表示任务已经全部完成。

但是这样又引入了新的问题,由于调度线程提交完任务,并将提交结束标识置为真,两个操作之间并没有同步。可能出现这样的场景:调度线程已经提交完任务,并且工作线程全部执行结束,但在工作线程检测计数是否为0时,调度线程还没来得及将提交结束标识置为真,那么工作线程就会以为还有任务没有完成。所以,在调度线程提交完任务之后,也应该检测下任务是否已经全部完成,这样如果工作线程错过了检测时机,可以由调度线程来进行把关。在这个时机,我们可以做一些事情,比如希望在提交的任务全部完成时执行某个动作,至于是哪个线程执行并不重要,谁最后进行检测就由谁来执行。

对于上面提及的两个变量:任务提交数和提交结束标识,以及相关的操作,都封装在CompleteLatch中:

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/ScheduleContext.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static class CompleteLatch<E> {

// 任务是否提交结束
private volatile boolean hasSubmitCompleted = false;

// 还没有结束的任务数
private final AtomicInteger taskNotCompleted = new AtomicInteger(1);

// 闭锁,等待任务全部提交并执行结束
private final CountDownLatch latch = new CountDownLatch(1);

// ... ...

public void submitCompleted(){
hasSubmitCompleted = true;
}

public boolean hasSubmitCompleted(){
return hasSubmitCompleted;
}

public void taskCompleted(){
latch.countDown();
}

public boolean waitTaskCompleted(long taskOverTime) throws InterruptedException{
return latch.await(taskOverTime, TimeUnit.MILLISECONDS);
}

public void waitTaskCompleted() throws InterruptedException{
latch.await();
}

public long increaseTaskNotCompleted(){
return taskNotCompleted.incrementAndGet();
}

public boolean hasTaskCompleted(){
return taskNotCompleted.decrementAndGet() == 0;
}

public long getTaskNotCompleted(){
return taskNotCompleted.get();
}
}

根据CompleteLatch提供的操作,可以画出下图来示意调度线程的主要流程,提交任务、等待任务结束等

2.3. 超时检测

CompleteLatch中重载了一个限时等待操作waitTaskCompleted(long taskOverTime),目的是为了实现任务超时检测。超时检测由调度线程负责,主要思路是借助延时队列DelayQueue,通过Delayed来封装任务对应的future

当调度线程提交结束后,首先限时等待一个给定的超时时间overTime,如果等完还有任务没有结束,那么获取这些任务的耗时。如果已经超时则尝试取消,对于还没有超时的,计算一下任务剩余的可用时间,然后封装成Delayed放入延时队列。接下来就是反复从延时队列中获取一个时间最近的任务来检测,如果超时了就取消,否则重新计算剩余时间再放回队列。

如果最后队列为空,就表示所有的任务都已经结束或者超时。但是超时的任务并一定就结束了,虽然调度线程已经尝试过对执行任务的工作线程进行中断,但如果任务不响应也没办法,所以最后还是通过一个不限时的等待waitTaskCompleted()来进行兜底。

省去一些不相关的代码,主要实现可以如下:

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/ScheduleContext.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private void waitTaskCompleted(CompleteLatch<E> completeLatch){

// ... ...

long overTime = scheduleConfig.getTaskOverTime();

if(completeLatch.waitTaskCompleted(overTime)){
cleanCompletedFutures();
}else{
DelayQueue<TaskDelayed> delayQueue = new DelayQueue<>();
for(TimedFuture<Result<E>> future : submitFutures){
waitTaskFuture(future, delayQueue, overTime);
}

while(!delayQueue.isEmpty()){
TaskDelayed taskDelayed = delayQueue.take();
waitTaskFuture(taskDelayed.getFuture(), delayQueue, overTime);
}

long taskNotCompleted = completeLatch.getTaskNotCompleted();
if(taskNotCompleted > 0){
logger.warn("some[{}] tasks cancel fails, which may not respond to interrupts.", taskNotCompleted);
completeLatch.waitTaskCompleted();
}
cleanCompletedFutures();
}

// ... ...
}

private void waitTaskFuture(TimedFuture<Result<E>> future, DelayQueue<TaskDelayed> delayQueue, long overTime){
if(!future.isDone()) {
long startTime = future.getStartTime();
if(startTime == 0){ // startTime = 0 表示任务还没启动
delayQueue.add(new TaskDelayed(future, overTime));
}else{
long cost = System.currentTimeMillis() - future.getStartTime();
if(cost >= overTime){
try{
handleCancel(future.getTaskId(), cost);
}catch(Exception e){
logger.error("", e);
}

logger.info("cancle task[{}] due to time out, cost={}ms", future.getTaskId(), cost);
future.cancel(true);
}else{
delayQueue.add(new TaskDelayed(future, overTime - cost));
}
}
}
}

2.4. 任务关闭

根据Java中的线程机制,如果从外部取消线程,应该通过中断标识来进行通知,由目标线程自行决定在何时、以及使用何种方式结束自己。所以在关闭任务时,外部线程只做两件事:将状态置为 stopping,然后给调度线程发送中断请求

对于外部的关闭请求,调度线程只有在处于 running 或 sleeping 状态时会进行响应处理:

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/ScheduleContext.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public Response<Void> scheduleShutdown(){
synchronized (this) {
switch(state){

// ... ...

case RUNNING:
case SLEEPING:
state = STOPPING;
scheduleThread.interrupt(); //尽快响应
if(scheduleConfig.getPool().isTerminated()){
state = STOPPED;
isFirstRun = true;
}
logger.info("schedule[{}] will stop soon.", scheduleName);
return new Response<>(Response.SUCCESS, "schedule[" + scheduleName + "] will stop soon.");

// ... ...
}
}
}

调度线程在检测到中断请求后,会尽快重新检查状态,如果为 stopping,那么立即进行关闭清理操作,即shutdown线程池,然后等待线程池结束。

下面通过一段伪代码来描述ScheduleContext中调度线程如何处理关闭操作的,即在 running 和 sleeping 状态下如何响应关闭请求

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/ScheduleContext.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private class ScheduleThread extends Thread {

@Override
public void run() {
while(true){
if(state == STOPPING) {
terminate();
return
}

state = RUNNING

submit and execute tasks ...
try{
waitTaskCompleted ...
}catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 保留中断请求,后面检测处理
}

state = SLEEPING

try {
wait(waitTime);
} catch (InterruptedException e) {
// 响应中断:结束等待,并立即重新检测state
}
}
}
}

3. 其它

3.1. 关于统计

ScheduleContext中会对执行的任务进行统计,比如成功失败数、等待数、正在执行数以及结果等。这些都封装ScheduleStatistics中,由每个Task在结束时自行更新统计结果。

可以实现接口ResultHandler自定义任务结果的处理,比如持久化到数据库,默认是在内存中滚动保存7天的统计数据。

3.2. 关于配置

ScheduleContext中的所有配置通过ScheduleConfig来封装,其内部也是委托给了ConcurrentMap进行管理,然后对get/put调用做了一下代理,但屏蔽了remove操作,也就是说可以实时新增或修改配置,但不允许删除操作。

对于配置的修改,会进行持久化,以保证重启后修改不会丢失,也会尝试对@Value进行属性注入,但是只限于当前ScheduleContext中的属性。如果想在任务代码中对配置进行修改,可以通过注入ScheduleService来实现,其提供获取和修改的操作

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/support/service/ScheduleService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface ScheduleService {

// 序列化当前schedule的配置
public void serializeCurrent();

// 序列化指定schedule配置
public void serialize(@NotBlank(message = "scheduleName cannot be empty.") String scheduleName);

// 设置当前schedule的配置
public void putCurrentConfig(String key, Object value);

// 设置指定schedule的配置
public void putConfig(@NotBlank(message = "scheduleName cannot be empty.") String scheduleName, String key, Object value);

// 获取当前schedule的配置
public <V> V getCurrentConfig(String key);

// 获取指定schedule的配置
public <V> V getConfig(@NotBlank(message = "scheduleName cannot be empty.") String scheduleName, String key);
}

3.3. 关于日志

对于日志,通过slf4j进行创建,这样就不依赖于具体的日志实现,比如log4j、log4j2、或logback,然后复用了下spring-boot-starter-actuator中的日志监控模块,实现了任务日志级别实时修改的功能,并在其基础上增加了对log4j的适配。

另外,对于ScheduleContext的日志,默认会使用标识了@Fom的目标类来进行初始化,方便将任务的日志与目标类中其它的日志打到一起,如2.1

1
scheduleContext.setLogger(LoggerFactory.getLogger(scheduleBean.getClass()));

3.4. 分布式支持

支持分布式需要借助数据库,类似quartz的思路,将任务元数据放到统一的存储中,然后通过数据库的锁机制保证具体任务只会被一个任务进程中加载。

这里并没有将分布式支持作为目标,还没有遇到那种海量的任务需要通过起多个进程来分摊执行任务的场景,主要目标是轻量简单,但是又希望能比spring-schedule更丰富一些。当然也提供了一个静态入口:

https://github.com/shanhm1991/spring-fom/blob/master/src/main/java/org/springframework/fom/FomExternalRegister.java
1
public static void regist(String scheduleName, Class<?> scheduleClass, ScheduleConfig scheduleConfig)

可以将任务信息存到数据库,然后自行拼装配置信息进行注入加载启动,但这只是个潦草的解决办法,还要自己解决任务信息获取的竞争问题


参考: