《Java并发编程实战》 线程安全性的委托

words: 6.9k    views:    time: 26min

委托是创建线程安全类的一个最有效策略:只需让现有的线程安全类管理所有的状态即可。Java类库包含丰富的并发构建模块,如线程安全容器以及各种用于协调多个相互协作的线程的同步工具类。

1. 同步容器

同步容器的方式是:将它们的状态封装起来,并对每个公有方法进行同步,使得每次只有一个线程能访问容器的状态,因此同步容器类是线程安全的。

同步容器类是通过其自身的锁来保护其状态访问,如果在某些情况下需要额外的客户端加锁来保护复合操作:如迭代或者条件运算(若没有则添加),可以通过获得容器类的锁,在客户端构造原子操作。

1
2
3
4
5
6
public static Object getLast(Vector list){
synchronized(list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}

通过在客户端加锁可以解决不可靠的迭代问题,但会导致在迭代期间其他线程无法访问,降低了并发性

1
2
3
4
5
6
//在调用size和get之间存在并发访问
synchronized(vector){
for(int i=0; i<vector.size(); i++){
doSomething(vector.get(i));
}
}

同步容器类的迭代器并没有考虑并发修改的问题,它的策略是“及时失败”,即当它们发现容器在迭代过程中被修改会及时抛出异常ConcurrentModificationException

.集合类源码中有一个modCount计数,它表示当前集合被修改(新增或删除)的次数。在每次创建迭代器时会将迭代器中的expectedModCount置为modCount,如果在后面的迭代过程中发现这两个计数值不相等,则表示在创建完迭代器之后的迭代期间集合发生过改变,于是立即抛出异常。要注意的是,即使在单线程中,也可能抛出异常,如果在集合的迭代器迭代过程中,直接从容器中删除元素就会抛出这个异常。正确的做法是调用迭代器的删除方法iterator.remove(),它会同时修改modCountexpectedModCount,使它们保持一致

在多线程环境中要想避免这个异常,就必须在迭代过程中持有容器的锁。如果不想在迭代期间对容器加锁,一种替代的方法就是克隆容器,在副本上进行迭代,但克隆的过程中仍然要对容器加锁,这种方式的好坏取决于具体场景需求。

另外在调用容器的toStringhsahCodeequalscontainsAllremoveAllretainAll等方法时,以及把容器作为参数的构造函数,都会间接的对容器进行迭代,因此要注意所有这些隐含的迭代操作都可能抛出异常。

2. 并发容器

同步容器将所有对容器状态的访问都串行化,以实现线程安全,这种方法的代价是严重降低了并发性,因此,Java 5.0提供了多种并发容器来改进同步容器的性能。

2.1. ConcurrentHashMap

同步容器在执行每个操作期间都持有一个锁。在一些操作中,比如HashMap.getList.contains可能包含大量的工作,当遍历散列桶或链表来查找某个特定的对象时,必须在许多元素上调用equals,而equals本身也含有一定的计算量。在基于散列的容器中,如果hashCode不能很均匀地分布散列值,那么容器中的元素就不会均匀地分布在整个容器中。极端情况下,某个糟糕的散列函数可能会把一个散列表变成线性链表。当遍历很长的链表并且在某些或者全部元素上调用equals方法时,会花费相当长的时间,而这段时间内其他线程都不能访问该容器。

.可以参考源码分析:https://shanhm1991.github.io/2018/04/02/20180402/ ,java 1.8之后,HashMap改成当链表长度超过一定阈值后尝试用红黑树来代替以降低遍历的深度

HashMap一样,ConcurrentHashMap也是基于散列的Map,但它并不是让每个操作都在同一个锁上同步,使得每次只能有一个线程访问容器,而是用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。这种机制可以提供更高的并发性和伸缩性,任意数量的读线程都可以并发的访问Map,执行读操作的线程与执行写操作的线程可以并发的访问Map,并且一定数量的写线程也可以并发地修改Map,这在并发访问环境下可以实现更高的吞吐量,而在单线程环境中只损失非常小的性能。

ConcurrentHashMap与其他并发容器一起增强了同步容器类,他们提供的迭代器具有弱一致性,不会抛出并发修改异常,也不需要在迭代过程中对容器加锁,弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但不保证)在迭代器被构造后将修改操作反映给容器。

尽管有这些改进,但仍有一些需要权衡的因素。对于一些需要在整个Map上进行计算的方法,比如size()isEmpty(),办法是弱化这些方法的语义以反映容器的并发特性,因为返回的结果可能已经过期了,而只是一个近似值。但这是允许的,事实上size()isEmpty()这样的方法在并发环境下的作用有限,因为它们的返回值总在不断变化,从而这些操作的需求也被弱化了,以换取对其他更重要操作的性能优化,如getputcontainsKeyremove等。

另外,由于ConcurrentHashMap不能被加锁来执行独占访问,因此无法使用客户端加锁来创建新的原子操作(由于我们获取不到想要的锁,因此客户端加锁也就失去意义)。不过一些常见的复合操作,若没有则添加,若相等则去除,若相等则替换等,都已经实现成了原子操作,并声明在ConcurrentMap中。

2.2. CopyOnWriteArrayList

CopyOnWriteArrayList用于替代同步List,在某些情况下可以提供更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。写入时复制容器的线程安全性在于,只要正确地发布一个事实不可变对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于数组的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改所带来的影响。

但是,每当修改容器时都要复制底层数组,这需要一定的开销,尤其当容器的规模较大时。因此,仅当迭代操作远远多于修改操作时,才应该使用写入时复制容器。

2.3. Queue

Queue用来临时保存一组等待处理的元素。常用的实现有ConcurentLinkedQueue,传统的先进先出队列。另外,阻塞队列BlockingQueue扩展了Queue,增加了可阻塞的插入和获取等操作。

生产消费模式中,阻塞队列是常用的方式。其提供可阻塞的put/take方法,以及可定时的offer/poll方法。如果队列已经满了,那么put方法阻塞直到有空间可用;如果队列为空,那么take方法阻塞直到有元素可用;相对的offerpoll则及时返回一个失败状态。队列可以有界也可以无界,无界队列永远都不会充满,因此无界队列的put永远不会阻塞。

生产消费模式将要完成的工作与执行工作两个过程分离开来,这样简化了开发过程,消除了生产者类和消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用数据的过程解耦开来以简化工作负载的管理,因为这两个过程在处理数据的速率上有所不同。一种最常见的生产者-消费者场景就是线程池与工作队列的组合,在Executor任务执行框架中体现了这种模式。

关于BlockingQueue有几种实现:

  • LinkedBlockigQueueArrayBlockingQueue是FIFO队列,二者区别与LinkedList/ArrayList类似,但比同步List拥有更好的并发性;

  • PriorityBlockingQueue是一个按优先级排序的队列,可以根据元素的自然顺序来比较元素,也可以使用Comparator来比较;

  • SynchronousQueue并不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程等待着把元素加入或移出队列。这种区别就好像将文件直接交给同事还是将文件放到他的邮箱中希望他能尽快拿到文件。因为没有存储功能,因此put/take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。因此仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

3. 示例:桌面搜索

需求:扫描本地驱动器上的文件并建立索引以便随后进行搜索

:FileCrawler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class FileCrawler implements Runnable{

private final BlockingQueue<File> fileQueue;

private final FileFilter fileFilter;

private final File root;

//...

public void run() {
try{
crawl(root);
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}

private void crawl(File root) throws InterruptedException{
File[] entries = root.listFiles(fileFilter);
if(entries != null){
for(File entry : entries){
if(entry.isDirectory()){
crawl(entry);
}else if(!alreadyIndexed(entry)){
fileQueue.put(entry);
}
}
}
}
}

class Indexer implements Runnable{

private final BlockingQueue<File> queue;

public Indexer(BlockingQueue<File> queue){
this.queue = queue;
}

public void run(){
try{
while(true){
indexFile(queue.take());
}
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
}

FileCrawler定义了生产者,在给定目录中搜索符合索引标准的文件,并将它们放入队列。Indexer定义了消费者,从队列中获取文件并对它们建立索引。因此,将遍历文件与建立索引分解成了独立操作,每个操作只需完成一个任务,并且阻塞队列将负责所有的控制流,这样每个功能的代码都更加简单清晰。

.生产消费模式也是利用了线程封闭方法,生产者通过阻塞队列将非线程安全的可变对象的所有权移交给了消费者线程,即安全的发布给了消费者,自始至终对象都只是由单线程拥有,因此拥有该对象的线程可以对其进行任意修改

Java 6.0开始,增加了双端队列容器:DequeBlockingDeque。它们分别对QueueBlockingQueue做了扩展,可以在队列头或者队列尾进行高效插入和移除,具体实现有ArrayDequeLinkedBlockingDeque

双端队列适用于模式:工作密取,在生产消费模式中,所有消费者共享一个工作队列。而在工作密取设计中,每个消费者都有自己的双端队列,如果一个消费者完成了自己队列中的工作,它可以从其他消费者的队列末尾获取工作。这样,密取模式比生产消费模式具有更高的可伸缩性,因为消费者线程不会在单个共享的任务队列上发生竞争

.这与上面的分段锁是一样的想法,即降低在同一个对象锁上面的竞争程度,另外,消费线程从尾部访问另一个队列,而不是从头部获取任务,也能降低队列上一定的竞争程度

密取模式非常适用于既是生产者也消费者问题,有时执行某个任务时可能导致更多的任务,比如网页爬虫,处理一个页面时发现更多的页面需要处理,又比如在垃圾回收阶段对堆进行标记问题。当消费线程发现新的任务时,它可以将其放到自己或其他消费线程的队列末尾。

4. 同步工具

所有的同步工具类都包含一些特定的结构化属性,它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续还是等待。此外还提供了一些方法来对状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。

4.1. CountDownLatch

闭锁的作用相当于一扇门,在闭锁到达结束状态之前,这扇门一直是关闭的,没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。但是,当闭锁到达结束状态后,将不会再改变状态,将永远保持打开状态。所以,闭锁一般用来确保某些活动直到其他活动都完成后才继续执行的场景。

.后面在同步工具类的设计中会介绍一种改进版的闭锁:阀门类,其状态可以重复打开和关闭

CountDownLatch是一种灵活的闭锁实现,它包括一个计数器,初始化为一个正数,表示需要等待的事件数量。countDown()方法会递减计数器,表示有一个事件已经发生了,而await()方法一直等待直到计数器为零,表示直到所有等待的事件都已经发生。

:TestHarness
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TestHarness {

public long timeTasks(int nThreads, Runnable task) throws InterruptedException{
CountDownLatch startGate = new CountDownLatch(1);
CountDownLatch endGate = new CountDownLatch(nThreads);
for(int i = 0; i < nThreads; i++){
Thread t = new Thread(){
@Override
public void run() {
try{
startGate.await();
try{
task.run();
}finally{
endGate.countDown();
}
}catch(InterruptedException ignored){

}
}
};
t.start();
}

long start = System.nanoTime();
startGate.countDown();
endGate.await();
return System.nanoTime() - start;
}
}

示例中startGate使主线程能够同时释放所有工作线程,确保所有的工作线程同时开始工作而达到并发的目的,endGate则使主线程能够等待最后一个线程执行完成。

4.2. FutureTask

FutureTask实现了Future语义,表示一种抽象的可生成结果的计算,通过Callable来实现。它可以处于三种状态:等待运行,正在运行,和运行完成。运行完成包括所有的可能方式,即正常结束、取消或异常失败,一旦进入完成状态,它将永远停在这个状态上。

.关于FutureTask源码分析,具体可以参考:https://shanhm1991.github.io/2018/04/05/20180405/

FutureTask也可以用作闭锁,Future.get()的行为取决于任务的状态,如果任务已经完成,那么立即返回,否则将一直阻塞直到任务完成,然后返回结果或者抛出异常。显然FutureTask需要确保能够将计算结果从计算线程安全地发布给获取结果的线程。

通常,FutureTask用在Executor框架中表示异步任务,或者用于表示一些耗时计算,它可以将任务的执行与结果的获取分开。即可以先启动任务,然后在需要时再进行结果获取,如果完成了就直接获取,没有完成则进入等待,但也能省去一部分等待时间。

:PreLoader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class PreLoader {

private final FutureTask<Integer> future = new FutureTask<Integer>(new Callable<Integer>(){
@Override
public Integer call() throws Exception {
// execute something
int result = 0;
return result;
}
});

private final Thread thread = new Thread(future);

public void start(){
thread.start();
}

public Integer get(){
try {
return future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return -1;
}
}

4.3. Semaphore

信号量可以用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。Semaphore中管理着一组虚拟的许可,许可的数量可以通过构造函数来指定,然后在执行操作前首先获得许可,并在使用后释放。如果没有许可,那么acquire将阻塞直到有许可为止。

通常Semaphore可以用来实现一个可阻塞的资源池,比如构造一个固定长度的资源池,将Semaphore的计数值初始化为池的大小,然后从池中获取资源之前先acquire一个许可,并在将资源返回池中之后release一个许可,那么在当池为空或满时,资源的获取和释放将变成阻塞直到执行条件成立,而不是直接失败。

:BoundedSet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class BoundedSet<T> {

private final Set<T> set;

private final Semaphore sem;

public BoundedSet(int bound){
//虽然在许可的获取上使用了同步,但随后对set的操作依然存在竞争的风险
set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}

public boolean add(T t) throws InterruptedException{
sem.acquire();
boolean added = false;
try{
added = set.add(t);
return added;
}finally{
if(!added){
sem.release();
}
}
}

public boolean remove(Object o){
boolean removed = set.remove(o);
if(removed){
sem.release();
}
return removed;
}
}

4.4. CyclicBarrier

栅栏类似于闭锁,也是阻塞一组线程直到某个事件发生,它们的关键区别在于闭锁用于等待事件,而栅栏用于等待其他线程

.换句话说,就是闭锁通常是让A事件线程等待其他B事件线程使某个条件成立,而栅栏是让一堆类似的事件T线程等待彼此到达一个约定的共同事件点

当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达,那么栅栏将打开,此时所有线程都将被释放,而栅栏将被重置,以便下次使用。

如果对await()调用超时或者在await()上阻塞的线程被中断,那么栅栏就认为是被打破了,所有阻塞在await()上线程都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await()将为每个线程返回一个唯一的到达索引,可以利用这些索引来选举一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。

一些场景中,某些步骤中的计算可以并行执行,但必须等待该步骤中的所有计算都完成后才可以进行下一步。例如,在n-body粒子模拟系统中,每个步骤都根据其他粒子的位置和属性来计算各个粒子的新位置。通过在两次步骤之间等待栅栏,可以确保在第K步中的所有更新操作都计算完毕,才进入第K+1步。

  • 生命游戏中通过栅栏来计算细胞的自动化模拟:
:CellularAutomata mark:45,53
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class CellularAutomata {

private final Board mainBoard;

private final CyclicBarrier barrier;

private final Worker[] workers;

public CellularAutomata(Board board){

mainBoard = board;

int cpuCount = Runtime.getRuntime().availableProcessors();

barrier = new CyclicBarrier(cpuCount, new Runnable(){
@Override
public void run() {
mainBoard.commitNewValues();
}
});

workers = new Worker[cpuCount];
for(int i = 0; i < cpuCount; i++){
workers[i] = new Worker(mainBoard.getSubBoard(cpuCount, i));
}
}

public void start(){
for(int i = 0; i < workers.length; i++){
new Thread(workers[i]).start();
}
mainBoard.waitForConvergence();
}

private class Worker implements Runnable{

private final Board board;

public Worker(Board board){
this.board = board;
}

@Override
public void run() {
while(!board.hasConvergence()){
for(int x = 0; x < board.getMaxX(); x++){
for(int y = 0; y < board.getMaxY(); y++){
board.setNewValue(x, y, computeValue(x, y));
}
}

try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
}

CellularAutomata将模拟过程并行化,如果为每个细胞元素分配一个线程是不合适的,过多的线程会导致协调上的开销从而降低性能。所以将细胞分成cpuCount份(由于没有IO过程,当线程数为cpu个数时,吞吐量最高),所以每个Worker分别计算总细胞数 / cpuCount个细胞。

每次当Worker计算完一遍子问题中所有的细胞,便会到达栅栏等待,当所有Worker都到达时便一起通过栅栏,然后一起继续下一轮计算, 直到条件hasConvergence()成立,即问题收敛计算完毕。

另一种形式的栅栏Exchanger,称为两方栅栏,各方在栅栏位置上交换位置。当两方执行不对称的操作时,Exchanger会很有用。例如一个线程向缓存写数据,而另一个线程读取数据,它们可以使用Exchanger来汇合,并将满的缓存与空的缓存交换。关于交换时机的选择,当缓存被填满时由填充任务交换,当缓存为空时由清空任务交换,这样可以将交换的次数将至最低。但如果数据的到达不可预测,数据的处理将被延迟,所以可以在缓存被填充到一定程度并保持一定时间后也进行交换。

5. 示例:构建结果缓存

接口Computable声明一个函数compute,输入类型为A,输出类型为V

:Computable
1
2
3
4
public interface Computable<A, V> {
//耗时运算
V compute(A arg) throws InterruptedException;
}

假设它实现的是一些耗时运算,这里用装饰器来给它的实现增加一个缓存功能,用来记住之前的计算结果以便在接收到相同的输入时能直接返回结果

:Memorizer1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Memorizer1<A, V> implements Computable<A, V>{

public final Map<A, V> cache = new HashMap<>();

public final Computable<A, V> c;

public Memorizer1(Computable<A, V> c){
this.c = c;
}

@Override
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if(result == null){
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}

Memorizer1使用HashMap来缓存结果,并使用同步来保证线程安全性,但存在明显的可伸缩性问题,当排队线程较多时甚至可能出现排队时间超过计算本身所需要的时间。

5.1. 改进:委托ConcurrentHashMap

:Memorizer2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Memorizer2<A, V> implements Computable<A, V>{

public final Map<A, V> cache = new ConcurrentHashMap<>();

public final Computable<A, V> c;

public Memorizer2(Computable<A, V> c){
this.c = c;
}

@Override
public V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if(result == null){
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}

Memorizer2使用ConcurrentHashMap代替HashMap,有着更好的并发行为。但Memorizer2问题在于,如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在进行,那么很可能会重复这个计算

5.2. 改进:缓存FutureTask

:Memorizer3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Memorizer3<A, V> implements Computable<A, V>{

public final Map<A, FutureTask<V>> cache = new ConcurrentHashMap<>();

public final Computable<A, V> c;

public Memorizer3(Computable<A, V> c){
this.c = c;
}

@Override
public V compute(final A arg) throws InterruptedException {
FutureTask<V> f = cache.get(arg);
if(f == null){
Callable<V> eval = new Callable<V>(){
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
f = new FutureTask<V>(eval);
cache.put(arg, f);
f.run();
}

try {
return f.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
}

Memorizer2是检查某个计算是否已经结束,而Memorizer3则是检查某个计算是否已经开始。因此它可以表现出更好的并发性,如果结果已经计算出来,将立即返回,如果有其他线程正在计算结果,那么新到的线程将直接等待这个已经开始的计算完成。

但它仍然存在两个线程重复计算的可能,因为if的判断(先检查再执行)并非原子的,可能多个线程同一时间到达compute方法,同时没有获取到FutureTask

5.3. 改进:解决先判断后执行问题

:Memorizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Memorizer<A, V> implements Computable<A, V>{

public final Map<A, FutureTask<V>> cache = new ConcurrentHashMap<>();

public final Computable<A, V> c;

public Memorizer(Computable<A, V> c){
this.c = c;
}

@Override
public V compute(A arg) throws InterruptedException {
FutureTask<V> f = cache.get(arg);
if(f == null){
Callable<V> eval = new Callable<V>(){
@Override
public V call() throws Exception {
return c.compute(arg);
}
};

FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg, ft);
if(f == null){ // f代表之前缓存的FutureTask,如果存在就直接丢弃新建的
f = ft;
f.run();
}
}

try {
return f.get();
} catch(CancellationException e){
cache.remove(arg);
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
}

Memorizer使用原子操作putIfAbsent解决了先检查再执行的问题。但仍有问题需要考虑,当缓存的是Future而不是值时,将可能出现缓存污染。如果某个计算被取消或者失败,在获取结果时将知道计算过程被取消或者失败,那么应该将Future从缓存中移除,这样将来的计算才可能成功。另外Memorizer还需要考虑缓存的逾期和清理等问题。

.在自己实现的一个任务框架中有这样的场景:需要周期性的执行一批任务,每个任务有一个id,但是要求同一时刻,不可以存在相同任务id的任务同时运行。这里就涉及到需要判断缓存中Future状态的问题,仅仅委托ConcurrentHashMap也不能解决问题,考虑到提交任务是个消耗不大的操作,于是就妥协直接使用了同步

:https://github.com/shanhm1991/spring-fom/blob/master/spring-fom/src/main/java/org/springframework/fom/ScheduleContext.java mark:11-14,20,30-31
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static Map<String,TimedFuture<Result<?>>> submitMap = new HashMap<>(512);

private void submitWithConflict(Collection<? extends Task<E>> tasks, ScheduleBatch<E> scheduleBatch){
synchronized(submitMap){
Iterator<? extends Task<E>> it = tasks.iterator();
String taskId = null;
try{
while(it.hasNext()){
Task<E> task = it.next();
taskId = task.getTaskId();
if (isTaskAlive(taskId)) {
logger.warn("task[{}] is still alive, create canceled.", taskId);
continue;
}

task.setScheduleBatch(scheduleBatch);
TimedFuture future = (TimedFuture)submit(task);

it.remove();
submitMap.put(taskId, future);
submitFutures.add(future);
}
} catch (RejectedExecutionException e) {
logger.warn("task[{}] submit rejected, and ignored task={}", taskId, tasks, e);
}
}
}

private boolean isTaskAlive(String taskId){
Future<Result<?>> future = submitMap.get(taskId);
return future != null && !future.isDone();
}


参考:

  1. Copyright ©《java并发编程实战》