publicstatic <T> Callable<T> callable(Runnable task, T result) { if (task == null) thrownewNullPointerException(); returnnewRunnableAdapter<T>(task, result); }
staticfinalclassRunnableAdapter<T> implementsCallable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
// Possible state transitions: // NEW -> COMPLETING -> NORMAL // NEW -> COMPLETING -> EXCEPTIONAL // NEW -> CANCELLED // NEW -> INTERRUPTING -> INTERRUPTED
// 真正唤醒获取线程之前,还有内存屏障操作,这就保证了上面结果和最终状态的赋值能被获取线程看到 privatevoidfinishCompletion() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // volatile for (;;) { // 遍历栈,逐个唤醒线程 Threadt= q.thread; if (t != null) { q.thread = null; // volatile LockSupport.unpark(t); } WaitNodenext= q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } }
done(); // 子类扩展
callable = null; // to reduce footprint }
// 参考下面cancel(true)分析,语义就是让执行线程等待取消线程将状态置为INTERRUPTED后再退出run privatevoidhandlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); }
public V get()throws InterruptedException, ExecutionException { ints= state; if (s <= COMPLETING) // 如果任务还处于COMPLETING或之前的状态,则进入阻塞等待 s = awaitDone(false, 0L); return report(s); // 返回结果 }
privateintawaitDone(boolean timed, long nanos)throws InterruptedException { finallongdeadline= timed ? System.nanoTime() + nanos : 0L; WaitNodeq=null; booleanqueued=false; for (;;) { // 循环重试 if (Thread.interrupted()) { // 1. 检查是否有中断请求,如果有则抛出InterruptedException removeWaiter(q); thrownewInterruptedException(); }