JUC FutureTask

———— JDK 1.8
words: 2.6k    views:    time: 11min

前面整理线程池时说过Executor是执行任务的抽象,而Runnable是任务的抽象。不过,线程池在实际提交任务时,会先将任务包装成一个FutureTask,以实现Future的语义,即异步获取任务结果的能力FutureTask内部实际依赖的是一个Callable,并将任务的执行过程和结果获取委托给它,如果提交的是Runnable,那么也可以通过适配器RunnableAdapter<V>将其适配成一个Callable

java.util.concurrent.Executors
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

相关类图可以如下所示:


1. 属性

java.util.concurrent.FutureTask
1
2
3
4
5
6
7
8
9
private volatile int state; // 任务状态

private Callable<V> callable; // 待执行的任务

private Object outcome; // 任务执行的结果或异常,非volatile字段,通过期前后state状态的设置来保证线程安全

private volatile Thread runner; // 执行任务的线程,执行run方法的线程首先尝试CAS设置runner,成功才有执行资格

private volatile WaitNode waiters; // 阻塞在Future.get上的线程

1.1. state 状态

java.util.concurrent.FutureTask
1
2
3
4
5
6
7
8
9
10
11
12
13
// Possible state transitions:
// NEW -> COMPLETING -> NORMAL
// NEW -> COMPLETING -> EXCEPTIONAL
// NEW -> CANCELLED
// NEW -> INTERRUPTING -> INTERRUPTED

private static final int NEW = 0; // 新建
private static final int COMPLETING = 1; // 执行完成之后,设置结果之前的一个中间状态,表示正在完成
private static final int NORMAL = 2; // 正常完成
private static final int EXCEPTIONAL = 3; // 异常结束
private static final int CANCELLED = 4; // 已被取消
private static final int INTERRUPTING = 5; // 中断之前设置的一个状态,表示正在中断,中断之后置为INTERRUPTED
private static final int INTERRUPTED = 6; // 已被中断

对于任务,设置了以上7种状态,其状态转换机制如注释所示,但如果不作细分的话,三个状态就可以区分任务的进度了

  • NEW:表示任务新建以及正在执行的过程
  • COMPLETING:表示任务执行完成或者异常结束,剩下的只是赋值而已
  • CANCELLED:任务在执行完成前取消了,至于中断状态只是用来描述取消之后任务的中断状态

所以,我们可以看到其对任务是否结束,以及是否被取消的判断非常简洁

java.util.concurrent.FutureTask
1
2
3
4
5
6
7
public boolean isCancelled() {
return state >= CANCELLED;
}

public boolean isDone() {
return state != NEW;
}

1.2. waiters

waiters以栈的形式来维护阻塞在get()上的等待线程,对于获取到结果或者等待超时的线程节点应该删除,不过这里并没有用什么标识位来标识,而是直接将thread置为null就表示节点可以删除

java.util.concurrent.FutureTask
1
2
3
4
5
6
7
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() {
thread = Thread.currentThread();
}
}

2. 构造器

构造器很简单,就是对callable进行一层包装,让它拥有Future的能力,如果是Runnable则先进行一下适配

java.util.concurrent.FutureTask
1
2
3
4
5
6
7
8
9
10
11
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result); // 适配
this.state = NEW; // ensure visibility of callable
}

3. 接口

3.1. run

  1. 首先检查状态,并绑定线程:只有任务状态为New并且当前线程绑定成功,才执行任务;事实上在线程池中通过阻塞队列是能够保证任务是被执行线程独占的,但是不排除有其它将FutureTask交给多个线程共享的使用场景,所以这里还是做了一下防护。

  2. 然后对于执行无非两种结果,即正常结束或者异常结束,不过这里有个巧妙的地方就是对于结果outcome的发布没有使用任何线程安全手段,而是将它放在两次volatile变量操作之间,而volatile在Java中具有内存屏障的语义,所以对于观察线程,如果它能看到执行线程的volatile变量操作成功,就一定能看到这个操作之前操作的结果。

这里还有个细节就是第二次状态修改时,将volatile变量操作弱化成了普通变量的读写,然后在真正唤醒线程之前再进行volatile变量操作,这样就保证了线程被唤醒后将能看到状态和结果的赋值,并且也省去了一次内存屏障,降低了性能消耗。

java.util.concurrent.FutureTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;

try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING) // 有外部线程请求中断,那么等待它最终将状态置为INTERRUPTED
handlePossibleCancellationInterrupt(s);
}
}

protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // volatile
outcome = t;
// putOrderedInt弱化了volatile变量写操作,因为获取结果线程还没有唤醒,这里立即去同步内存也没有意义
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion(); // 唤醒获取线程,并在唤醒之前也有volatile操作,即内存屏障
}
}

protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}

// 真正唤醒获取线程之前,还有内存屏障操作,这就保证了上面结果和最终状态的赋值能被获取线程看到
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // volatile
for (;;) { // 遍历栈,逐个唤醒线程
Thread t = q.thread;
if (t != null) {
q.thread = null; // volatile
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done(); // 子类扩展

callable = null; // to reduce footprint
}

// 参考下面cancel(true)分析,语义就是让执行线程等待取消线程将状态置为INTERRUPTED后再退出run
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}

3.2. get

对于获取结果的线程,无非是先检查任务状态,如果已经结束则返回,否则进入阻塞等待。但是进入等待这个过程是有一些步骤的,比如创建节点,将节点入栈,而在这些步骤之间可能执行线程改变了任务状态,或者有其它获取线程也在并发入栈,这些都是要解决的问题。

主要思路是通过循环CAS操作来进行入栈,因为创建节点本身是不存在线程安全问题的,问题在于入栈操作,所以如果CAS失败了下次重试即可,并在重试之间检查任务的状态以及中断请求等。

java.util.concurrent.FutureTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 如果任务还处于COMPLETING或之前的状态,则进入阻塞等待
s = awaitDone(false, 0L);
return report(s); // 返回结果
}


private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) { // 循环重试

if (Thread.interrupted()) { // 1. 检查是否有中断请求,如果有则抛出InterruptedException
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) { // 2. 检查是否已经结束,如果是则立即返回
if (q != null)
q.thread = null; // 如果之前循环中已经创建了WaitNode,则将thread置为空
return s;
}else if (s == COMPLETING) // 3. 检查是否为COMPLETING,如果是,那么应该很快结束,所以稍等下重试
Thread.yield();
else if (q == null) // 4. 否则状态肯定是NEW,说明还在执行或等待执行,那么先创建节点,准备入栈
q = new WaitNode();
else if (!queued) // 5. 考虑到并发调用,这里通过循环CAS操作,确保入栈,后面执行线程结束后会进行通知
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) { // 6.1. 限时阻塞,如果超时则在下次进入for循环后检测到deadline并退出
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state; // get(long timeout, TimeUnit unit) 会检查这里返回的state,如果没完成则抛出超时异常
}
LockSupport.parkNanos(this, nanos);
}else
LockSupport.park(this); // 6.2. 无限阻塞,直到被执行线程唤醒
}
}

private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null; // 先将thread置为空,将节点标记为待删除

retry:
for (;;) {
for (WaitNode pred = null, q = waiters, s; q != null; q = s) { // 每次删除都从头节点开始遍历
s = q.next;
if (q.thread != null) // 当前节点不需要删除,则继续向下遍历
pred = q;
else if (pred != null) { // 否则如果前驱不为null,那么让前驱跳过自己
pred.next = s;
if (pred.thread == null)
continue retry; // 如果发现前驱也需要删除,那么再重头开始
}else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) // 要删除的是头节点,那么CAS弹出
continue retry; // 检查后面的节点是否需要删除
}
break; // 没有再需要删除的节点,那么退出
}
}
}

private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x; // 正常结果

if (s >= CANCELLED) // 被取消了,包括 CANCELLED/INTERRUPTING/INTERRUPTED
throw new CancellationException();

throw new ExecutionException((Throwable)x); // 执行异常
}

3.3. cancel

对于取消操作,首先也是检查状态,只有状态为New的任务才支持取消操作,并且只能被一个线程取消一次。

如果mayInterruptIfRunningtrue,则对执行线程发起中断请求,并在中断动作前后将状态置为INTERRUPTINGINTERRUPTED来区分,最后唤醒等待线程,然后它们将会看到任务的状态并得到CancellationException。要注意的是这里对于执行线程是否真的结束并不关心,其真正语义是取消获取线程的等待,而不是任务线程的执行,仅仅是如果需要的话,会尝试一下中断。

java.util.concurrent.FutureTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW
&& UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;

try {
if (mayInterruptIfRunning) { // 如果需要中断,则请求中断执行线程
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // 将状态置为 INTERRUPTED
}
}
} finally {
finishCompletion(); // 唤醒在get()上阻塞的线程
}
return true;
}


参考: