Netty Future & Promise

———— 4.1.72.Final
words: 2k    views:    time: 9min

在Java中,对于异步操作结果的获取抽象出了一个接口Future<V>,它提供了一些方法来检查或等待操作是否完成,以及获取操作的结果,同时还提供了取消操作的能力,以及提供了检测操作是正常结束还是被取消的方法。不过如果操作已经完成了,就不能再对其进行取消了。在某些场景下,如果只是想借助Future实现操作取消的能力,而不关心操作结果,那么可以返回空即可。

A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation.
The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.

其接口定义如下:

java.util.concurrent.Future
1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

在Java中,一般会将异步操作封装成一个独立的任务,而任务的抽象是Runnable,于是定义接口RunnableFuture,那么它的实现就表示一个能异步获取结果的任务,具体的实现可以参考之前相关笔记:[JUC FutureTask]

在Netty中,对原生Future接口又进行了扩展,主要是在操作完成时添加了事件监听的能力,其实就是观察者模式的应用,这样就可以在操作结束时执行一些事件回调。

io.netty.util.concurrent.Future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();
boolean isCancellable();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); // Listener维护
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> sync() throws InterruptedException; // 同步等待任务结束
Future<V> syncUninterruptibly();
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
}

然后,PromiseFuture的基础上又提供了对操作结果写的能力,也就是让执行线程之外的其它线程也可以修改操作的结果和状态

io.netty.util.concurrent.Promise
1
2
3
4
5
6
7
8
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
boolean setUncancellable();
// ...
}

相关类继承结构部分如下所示:

要理解的是,Future本身表示的只是一个动作,并没有任务的概念,如果在Java中经常使用线程池提交任务,由于其返回的结果也是通过Future表示,那么可能会产生混淆,不过如果理解它返回的实际上是一个FutureTask,只是Future在任务上的一种实现就可以了。

如果一定要使用任务的形式,那么也Netty也提供了PromiseTask<V>,它与FutureTask类似,也是RunnableFuture的一个实现。以上图为例,下面分析下其中几个类的实现和作用。

1. AbstractFuture

这里对于结果的获取实现非常简洁,如果操作没结束就进行等待,否则结束了再看是否有异常,决定是返回结果还是抛出异常。

io.netty.util.concurrent.AbstractFuture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractFuture<V> implements Future<V> {

@Override
public V get() throws InterruptedException, ExecutionException {
await();

Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}
}

2. CompleteFuture

CompleteFuture是一种特殊的实现,从命名就可以看出其语义,表示一个已经完成的操作,所以其实现非常简洁,初始化时状态就是完成

io.netty.util.concurrent.CompleteFuture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public boolean isDone() {
return true;
}

@Override
public boolean isCancellable() {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

而完成的情况无非两种,成功返回结果,或者失败返回异常

  • 对于成功的操作,在初始化时就赋值其结果,而异常则置为null
io.netty.util.concurrent.SucceededFuture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final class SucceededFuture<V> extends CompleteFuture<V> {
private final V result;

public SucceededFuture(EventExecutor executor, V result) {
super(executor);
this.result = result;
}

@Override
public Throwable cause() {
return null;
}

@Override
public boolean isSuccess() {
return true;
}

@Override
public V getNow() {
return result;
}
}
  • 对于失败的操作,则反过来在初始化时赋值异常原因,而结果置为null
io.netty.util.concurrent.FailedFuture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final class FailedFuture<V> extends CompleteFuture<V> {

private final Throwable cause;

public FailedFuture(EventExecutor executor, Throwable cause) {
super(executor);
this.cause = ObjectUtil.checkNotNull(cause, "cause");
}

@Override
public Throwable cause() {
return cause;
}

@Override
public boolean isSuccess() {
return false;
}

@Override
public Future<V> sync() {
PlatformDependent.throwException(cause);
return this;
}

@Override
public Future<V> syncUninterruptibly() {
PlatformDependent.throwException(cause);
return this;
}

@Override
public V getNow() {
return null;
}
}

3. DefaultPromise

DefaultPromise的实现也比较简单,主要就两个角度,一个是获取结果的线程,如果操作没有结束,那么就进入等待。

io.netty.util.concurrent.DefaultPromise
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) { // 检查是否已经结束,即结果是否赋值
return this;
}

if (Thread.interrupted()) { // 进入阻塞之前先检查下中断
throw new InterruptedException(toString());
}

checkDeadLock(); // 当前线程不可以是 executor 绑定的线程

synchronized (this) { // 内置条件队列阻塞,并通过一个计数来限制最大等待的线程数
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}

另一个是赋值线程,就是将操作状态置为完成的线程,不过这里通过CAS操作保证了结果只能被赋值一次,同时在赋值成功之后会回调所有注册的Listener事件,如果赋值线程不是 executor本身,那么将会异步执行回调事件。

io.netty.util.concurrent.DefaultPromise
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
return this;
}
throw new IllegalStateException("complete already: " + this);
}

private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
// 设置结果,只有result的值为null或UNCANCELLABLE时,才可以进行设置
if (RESULT_UPDATER.compareAndSet(this, null, objResult)
|| RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) { // 唤醒等待的线程,并检查是否有需要执行的Listener
notifyListeners(); // 回调所有的Listener
}
return true;
}
return false;
}

private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}

private void notifyListeners() {
// 如果当前线程就是executor所绑定的线程,那么直接执行
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}

// 否则封装成任务,提交给executor进行执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}

4. PromiseTask

PromiseTaskDefaultPromise的基础上将操作封装成了一个任务,以便可以将具体的操作交给某个执行线程来执行了,这样的话就应该只有执行执行线程了解任务的状态了,而其它线程无权修改任务的状态或结果,所以实现中关闭这些操作入口。

io.netty.util.concurrent.PromiseTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {

@Override
public final Promise<V> setFailure(Throwable cause) {
throw new IllegalStateException();
}

@Override
public final boolean tryFailure(Throwable cause) {
return false;
}

@Override
public final Promise<V> setSuccess(V result) {
throw new IllegalStateException();
}

@Override
public final boolean trySuccess(V result) {
return false;
}

// ... ...
}

然后再将这些操作开放给了执行线程,由其负责管理任务的结果和状态。

io.netty.util.concurrent.PromiseTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void run() {
try {
if (setUncancellableInternal()) {
V result = runTask();
setSuccessInternal(result);
}
} catch (Throwable e) {
setFailureInternal(e);
}
}

V runTask() throws Throwable {
final Object task = this.task;
if (task instanceof Callable) {
return ((Callable<V>) task).call();
}
((Runnable) task).run();
return null;
}


参考: