任务管理工具spring-fom 使用示例

words: 2.2k    views:    time: 10min

根据设计文档中的描述,这里对一些常见的使用场景进行示例说明

https://github.com/shanhm1991/spring-fom-demo

1. 使用方式

  • 依赖
pom.xml
1
2
3
4
5
<dependency>
<groupId>io.github.shanhm1991</groupId>
<artifactId>spring-fom</artifactId>
<version>2.7.0</version>
</dependency>
  • 声明:@EnableFom

需要在spring的组件上通过@EnableFom进行声明,因为spring-fom 的依赖于spring应用上下文

Main.java
1
2
3
4
5
6
7
8
@EnableFom(enableFomView=false)
@Configuration
public class Main {
@SuppressWarnings("resource")
public static void main(String[] args) {
new AnnotationConfigApplicationContext("org.springframework.fom.test");
}
}
  • 定义任务:@Fom

比如定义一个简单的定时任务

SimpleSchedule.java
1
2
3
4
5
6
7
8
@Fom(cron = "0 */1 * * * ?")
public class SimpleSchedule {

@Schedule
public void schedule() {
// do something
}
}

2. 定时场景

2.1. 单任务 SingleSchedule

单个定时任务比较简单,这里演示一下对于ScheduleService的使用,实现在任务过程中对配置的获取和修改

示例中,配置项config.email可以在界面进行修改和注入,在任务的执行过程中,也可以手动新增修改配置项config.sleep.time,然后在管理界面上同样可以查看和修改

https://github.com/shanhm1991/spring-fom-demo/blob/main/src/main/java/org/springframework/fom/demo/schedules/single/SingleSchedule.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Fom(cron = "0 */1 * * * ?", remark = "定时单任务")
public class SingleSchedule {

private static final Logger LOG = LoggerFactory.getLogger(SingleSchedule.class);

private final Random random = new Random();

@Value("${config.email:shanhm1991@163.com}")
private String email;

@Autowired
private ScheduleService scheduleService;

@Schedule
public long exec() throws InterruptedException{
Integer sleepTime = scheduleService.getCurrentConfig("config.sleep.time");
if(sleepTime != null){
LOG.info("executing sleep..., email={}", email);
Thread.sleep(sleepTime);
}

sleepTime = random.nextInt(5000);
scheduleService.putCurrentConfig("config.sleep.time", sleepTime);
scheduleService.serializeCurrent();
return sleepTime;
}
}

2.2. 多任务 MultiSchedule

使用@Fom标识一个类,表示启用一个调度计划,使用@Schedule标识其中的方法,则表示以这个方法作为任务来执行。可以标识多个方法,然后这些任务共享同一个调度计划。在有的场景中,可能存在不同的任务,但他们的定时计划相同

https://github.com/shanhm1991/spring-fom-demo/blob/main/src/main/java/org/springframework/fom/demo/schedules/multi/MultiSchedule.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Fom(fixedRate = 120000, threadCore = 2, remark = "定时多任务")
public class MultiSchedule {

private static final Logger LOG = LoggerFactory.getLogger(MultiSchedule.class);

private final Random random = new Random();

@Schedule
public long task1() throws InterruptedException{
long sleep = random.nextInt(5000);
LOG.info("task executing ...");
Thread.sleep(sleep);
return sleep;
}

@Schedule
public String task2() throws InterruptedException{
LOG.info("task executing ...");
Thread.sleep(random.nextInt(5000));
return "test";
}
}

2.3. 批任务 BatchSchedule

实现接口ScheduleFactory,可以用集合的方式来提交一批任务,但是对于任务的抽象需要继承给定的Task。有的场景中,可能希望定时执行一批任务,并且这些任务有统一的抽象

https://github.com/shanhm1991/spring-fom-demo/blob/main/src/main/java/org/springframework/fom/demo/schedules/batch/BatchSchedule.java
1
2
3
4
5
6
7
8
9
10
11
12
@Fom(fixedDelay = 180000, threadCore = 4, remark = "定时批任务")
public class BatchSchedule implements ScheduleFactory<Long> {

@Override
public List<BatchTask> newSchedulTasks() throws Exception {
List<BatchTask> list = new ArrayList<>();
for(int i = 1; i <= 10; i++){
list.add(new BatchTask(i));
}
return list;
}
}

2.4. 任务结束事件 CompleteSchedule

实现接口CompleteHandler,可以在任务全部结束时,执行一些自定义事件,并且在执行过程可以拿到一些信息,比如执行的次数、本次执行时间、以及所有任务的结果,这在multi多任务或者batch批任务场景中比较有用

https://github.com/shanhm1991/spring-fom-demo/blob/main/src/main/java/org/springframework/fom/demo/schedules/complete/CompleteSchedule.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Fom(cron = "0 0/5 * * * ?", threadCore = 4, remark = "自定义任务结束处理")
public class CompleteSchedule implements ScheduleFactory<Long>, CompleteHandler<Long> {

private static final Logger LOG = LoggerFactory.getLogger(CompleteSchedule.class);

@Override
public List<CompleteTask> newSchedulTasks() throws Exception {
List<CompleteTask> list = new ArrayList<>();
for(int i = 1; i <= 10; i++){
list.add(new CompleteTask());
}
return list;
}

@Override
public void onComplete(long times, long lastTime, List<Result<Long>> results) throws Exception {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastExecTime);
LOG.info( "onComplete:第{}次在{}提交的任务全部完成,结果省略...", times, date);
}
}

2.5. 任务超时检测 TimeoutSchedule

通过taskOverTime可以给任务指定超时时间,这样当任务超时后调度线程会尝试对其进行中断取消

默认是对每个任务单独进行超时检测,如果想对整体任务进行超时检测,可以设置detectTimeoutOnEachTask为false。这样将从第一个任务开始执行时计算时间,等到超时后就取消还没结束的任务,不管这个任务实际耗时多久,就算它还没有开始执行也会被取消。

https://github.com/shanhm1991/spring-fom-demo/blob/main/src/main/java/org/springframework/fom/demo/schedules/timeout/TimeoutSchedule.java
1
2
3
4
5
6
7
8
9
10
11
12
@Fom(cron = "0 0/7 * * * ?", threadCore = 4, taskOverTime = 3500, remark = "任务超时检测")
public class TimeoutSchedule implements ScheduleFactory<Long>{

@Override
public List<TimeoutTask> newSchedulTasks() throws Exception {
List<TimeoutTask> list = new ArrayList<>();
for(int i = 1; i <= 15; i++){
list.add(new TimeoutTask());
}
return list;
}
}

2.6. 任务取消事件 CancelSchedule

实现接口TaskCancelHandler可以自定义任务的取消策略,对于取消策略的调用有两个时机:检测到任务超时,或者被外部shutdown关闭。如果想自定义shutdown之后的处理,比如释放一些资源,也可以实现接口TerminateHandler

有的场景中,可能要取消的任务无法响应中断,比如等待套接字返回,或者无限循环,此时只能通过关闭连接,或增加循环标志等办法来结束

https://github.com/shanhm1991/spring-fom-demo/blob/main/src/main/java/org/springframework/fom/demo/schedules/cancel/CancelSchedule.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Fom(cron = "0 0/11 * * * ?", execOnLoad = true, taskOverTime = 300000, remark = "自定义任务取消事件")
public class CancelSchedule implements TaskCancelHandler, TerminateHandler {

private static final Logger LOG = LoggerFactory.getLogger(CancelSchedule.class);

private volatile boolean off = false;

@Schedule
public void exec() {
LOG.info("task executing ...");
while(!off){
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
//ignore
}
}
}

@Override
public void handleCancel(String taskId, long costTime) {
off = true;
LOG.info("handleCancel:任务被取消{},已经耗时{}ms", taskId, costTime);

}

@Override
public void onTerminate(long execTimes, long lastExecTime) {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastExecTime);
LOG.info("onTerminate:任务关闭,共执行{}次任务,最后一次执行时间为{}", execTimes, date);
}
}

对于定义的Task,如果实现了接口TaskCancelHandler,同样也会在超时或者 shutdown 时进行调用,比如示例2.5中定义的任务

:https://github.com/shanhm1991/spring-fom-demo/blob/main/src/main/java/org/springframework/fom/demo/schedules/timeout/TimeoutTask.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TimeoutTask extends Task<Long> implements TaskCancelHandler {

@Override
public Long exec() throws InterruptedException {
long sleep = new Random().nextInt(10000);
logger.info("task executing ...");
Thread.sleep(sleep);
return sleep;
}

@Override
public void handleCancel(String taskId, long costTime) throws Exception {
logger.info("task handleCancel:取消任务[{}],耗时={}ms", taskId, costTime);
}
}

3. 非定时场景

@Fom不要求必须要有定时计划,如果没有定时计划,也可以当成一个线程池来使用,同样支持一些接口功能,并对外提供提交任务的接口

https://github.com/shanhm1991/spring-fom-demo/blob/main/src/main/java/org/springframework/fom/demo/other/FomExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Fom(threadCore = 4, taskOverTime = 5000, enableTaskConflict = true, detectTimeoutOnEachTask = true)
public class FomExecutor implements ResultHandler<Long>, TaskCancelHandler, CompleteHandler<Long> {

private static final Logger LOG = LoggerFactory.getLogger(FomExecutor.class);

@Override
public void handleResult(Result<Long> result) throws Exception {
LOG.info("handleResult:统计任务[{}]的结果:{}", result.getTaskId(), result.getContent());
}

@Override
public void handleCancel(String taskId, long costTime) throws Exception {
LOG.info("handleCancel:取消任务[{}],耗时={}ms", taskId, costTime);
}

@Override
public void onComplete(long times, long lastTime, List<Result<Long>> results) throws Exception {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastTime);
LOG.info("onComplete: 第{}次在{}提交的任务全部执行结束,结果数:{}", times, date, results.size());
}
}

在使用时要注意实际注入类型是ScheduleContext

https://github.com/shanhm1991/spring-fom-demo/blob/main/src/main/java/org/springframework/fom/demo/other/ExecController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
@RequestMapping("/exec")
public class ExecController {

@Autowired
private ScheduleContext<Long> $fomExecutor;

@RequestMapping("/submit")
public void submit(String tag) {
List<Task<Long>> tasks = new ArrayList<>();
for(int i = 0; i < 10; i++){
tasks.add(new OtherTask(tag + i));
}
$fomExecutor.submitBatch(tasks);
}
}

4. 任务管理服务

FomService中提供了一些接口支持任务的实时监控管理功能,如果接口功能不能满足,也可以直接获取ScheduleContext实例进行操作

https://github.com/shanhm1991/spring-fom-demo/blob/main/src/main/java/org/springframework/fom/demo/other/ExecController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RestController
@RequestMapping("/exec")
public class ExecController {

@Autowired
private FomService fomService;

@Autowired
private List<ScheduleContext<?>> schedules;

/**
* 获取任务列表
*/
@RequestMapping("/list")
public List<ScheduleInfo> list(){
return fomService.list();
}

/**
* 获取任务计数
*/
@RequestMapping("/count")
public Integer count(){
return schedules.size();
}
}

另外,提供了一个任务管理界面:http://[ip]:[port]/[context-path]/fom, 当然这些界面功能的接口也都是FomService提供的,比如

  • 任务列表

  • 任务配置详情

  • 成功任务统计

  • 执行中任务的堆栈信息

对于失败和等待中的任务,也可以查看异常信息、创建时间等信息,这里不再赘述…


参考: