在Java中,对于异步操作结果的获取 抽象出了一个接口Future<V>
,它提供了一些方法来检查或等待操作是否完成,以及获取操作的结果,同时还提供了取消操作的能力,以及提供了检测操作是正常结束还是被取消的方法。不过如果操作已经完成了,就不能再对其进行取消了。在某些场景下,如果只是想借助Future
实现操作取消的能力,而不关心操作结果,那么可以返回空即可。
A Future
represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get
when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel
method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future
for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?>
and return null
as a result of the underlying task.
其接口定义如下:
java.util.concurrent.Future 1 2 3 4 5 6 7 8 public interface Future <V > { boolean cancel (boolean mayInterruptIfRunning) ; boolean isCancelled () ; boolean isDone () ; V get () throws InterruptedException, ExecutionException ; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException ;}
在Java中,一般会将异步操作封装成一个独立的任务,而任务的抽象是Runnable
,于是定义接口RunnableFuture
,那么它的实现就表示一个能异步获取结果的任务 ,具体的实现可以参考之前相关笔记:[JUC FutureTask ]
在Netty中,对原生Future
接口又进行了扩展,主要是在操作完成时添加了事件监听的能力 ,其实就是观察者模式的应用,这样就可以在操作结束时执行一些事件回调。
io.netty.util.concurrent.Future 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public interface Future <V > extends java .util .concurrent .Future <V > { boolean isSuccess () ; boolean isCancellable () ; Throwable cause () ; Future<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ; Future<V> addListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ; Future<V> removeListener (GenericFutureListener<? extends Future<? super V>> listener) ; Future<V> removeListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ; Future<V> sync () throws InterruptedException ; Future<V> syncUninterruptibly () ; Future<V> await () throws InterruptedException ; Future<V> awaitUninterruptibly () ; boolean await (long timeout, TimeUnit unit) throws InterruptedException ; boolean await (long timeoutMillis) throws InterruptedException ; boolean awaitUninterruptibly (long timeout, TimeUnit unit) ; boolean awaitUninterruptibly (long timeoutMillis) ; }
然后,Promise
在Future
的基础上又提供了对操作结果写的能力 ,也就是让执行线程之外的其它线程也可以修改操作的结果和状态
io.netty.util.concurrent.Promise 1 2 3 4 5 6 7 8 public interface Promise <V > extends Future <V > { Promise<V> setSuccess (V result) ; boolean trySuccess (V result) ; Promise<V> setFailure (Throwable cause) ; boolean tryFailure (Throwable cause) ; boolean setUncancellable () ; }
相关类继承结构部分如下所示:
要理解的是,Future
本身表示的只是一个动作,并没有任务的概念,如果在Java中经常使用线程池提交任务,由于其返回的结果也是通过Future
表示,那么可能会产生混淆,不过如果理解它返回的实际上是一个FutureTask
,只是Future
在任务上的一种实现就可以了。
如果一定要使用任务的形式,那么也Netty也提供了PromiseTask<V>
,它与FutureTask
类似,也是RunnableFuture
的一个实现。以上图为例,下面分析下其中几个类的实现和作用。
1. AbstractFuture 这里对于结果的获取实现非常简洁,如果操作没结束就进行等待,否则结束了再看是否有异常,决定是返回结果还是抛出异常。
io.netty.util.concurrent.AbstractFuture 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public abstract class AbstractFuture <V > implements Future <V > { @Override public V get () throws InterruptedException, ExecutionException { await(); Throwable cause = cause(); if (cause == null ) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } @Override public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) { Throwable cause = cause(); if (cause == null ) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } throw new TimeoutException(); } }
2. CompleteFuture CompleteFuture
是一种特殊的实现,从命名就可以看出其语义,表示一个已经完成的操作,所以其实现非常简洁,初始化时状态就是完成
io.netty.util.concurrent.CompleteFuture 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public boolean isDone () { return true ; } @Override public boolean isCancellable () { return false ; } @Override public boolean isCancelled () { return false ; }
而完成的情况无非两种,成功返回结果,或者失败返回异常
对于成功的操作,在初始化时就赋值其结果,而异常则置为null
io.netty.util.concurrent.SucceededFuture 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public final class SucceededFuture <V > extends CompleteFuture <V > { private final V result; public SucceededFuture (EventExecutor executor, V result) { super (executor); this .result = result; } @Override public Throwable cause () { return null ; } @Override public boolean isSuccess () { return true ; } @Override public V getNow () { return result; } }
对于失败的操作,则反过来在初始化时赋值异常原因,而结果置为null
io.netty.util.concurrent.FailedFuture 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public final class FailedFuture <V > extends CompleteFuture <V > { private final Throwable cause; public FailedFuture (EventExecutor executor, Throwable cause) { super (executor); this .cause = ObjectUtil.checkNotNull(cause, "cause" ); } @Override public Throwable cause () { return cause; } @Override public boolean isSuccess () { return false ; } @Override public Future<V> sync () { PlatformDependent.throwException(cause); return this ; } @Override public Future<V> syncUninterruptibly () { PlatformDependent.throwException(cause); return this ; } @Override public V getNow () { return null ; } }
3. DefaultPromise DefaultPromise
的实现也比较简单,主要就两个角度,一个是获取结果的线程,如果操作没有结束,那么就进入等待。
io.netty.util.concurrent.DefaultPromise 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override public Promise<V> await () throws InterruptedException { if (isDone()) { return this ; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); synchronized (this ) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this ; }
另一个是赋值线程,就是将操作状态置为完成的线程,不过这里通过CAS操作保证了结果只能被赋值一次,同时在赋值成功之后会回调所有注册的Listener事件,如果赋值线程不是 executor本身,那么将会异步执行回调事件。
io.netty.util.concurrent.DefaultPromise 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Override public Promise<V> setSuccess (V result) { if (setSuccess0(result)) { return this ; } throw new IllegalStateException("complete already: " + this ); } private boolean setSuccess0 (V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setValue0 (Object objResult) { if (RESULT_UPDATER.compareAndSet(this , null , objResult) || RESULT_UPDATER.compareAndSet(this , UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { notifyListeners(); } return true ; } return false ; } private synchronized boolean checkNotifyWaiters () { if (waiters > 0 ) { notifyAll(); } return listeners != null ; } private void notifyListeners () { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1 ); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return ; } } safeExecute(executor, new Runnable() { @Override public void run () { notifyListenersNow(); } }); }
4. PromiseTask PromiseTask
在DefaultPromise
的基础上将操作封装成了一个任务,以便可以将具体的操作交给某个执行线程来执行了,这样的话就应该只有执行执行线程了解任务的状态了,而其它线程无权修改任务的状态或结果,所以实现中关闭这些操作入口。
io.netty.util.concurrent.PromiseTask 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class PromiseTask <V > extends DefaultPromise <V > implements RunnableFuture <V > { @Override public final Promise<V> setFailure (Throwable cause) { throw new IllegalStateException(); } @Override public final boolean tryFailure (Throwable cause) { return false ; } @Override public final Promise<V> setSuccess (V result) { throw new IllegalStateException(); } @Override public final boolean trySuccess (V result) { return false ; } }
然后再将这些操作开放给了执行线程,由其负责管理任务的结果和状态。
io.netty.util.concurrent.PromiseTask 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public void run () { try { if (setUncancellableInternal()) { V result = runTask(); setSuccessInternal(result); } } catch (Throwable e) { setFailureInternal(e); } } V runTask () throws Throwable { final Object task = this .task; if (task instanceof Callable) { return ((Callable<V>) task).call(); } ((Runnable) task).run(); return null ; }
参考: