publicstatic <T> Callable<T> callable(Runnable task, T result){ if (task == null) thrownew NullPointerException(); returnnew RunnableAdapter<T>(task, result); }
staticfinalclassRunnableAdapter<T> implementsCallable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call(){ task.run(); return result; } }
// Possible state transitions: // NEW -> COMPLETING -> NORMAL // NEW -> COMPLETING -> EXCEPTIONAL // NEW -> CANCELLED // NEW -> INTERRUPTING -> INTERRUPTED
publicvoidrun(){ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) // 有外部线程请求中断,那么等待它最终将状态置为INTERRUPTED handlePossibleCancellationInterrupt(s); } }
// 真正唤醒获取线程之前,还有内存屏障操作,这就保证了上面结果和最终状态的赋值能被获取线程看到 privatevoidfinishCompletion(){ for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // volatile for (;;) { // 遍历栈,逐个唤醒线程 Thread t = q.thread; if (t != null) { q.thread = null; // volatile LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } }
done(); // 子类扩展
callable = null; // to reduce footprint }
// 参考下面cancel(true)分析,语义就是让执行线程等待取消线程将状态置为INTERRUPTED后再退出run privatevoidhandlePossibleCancellationInterrupt(int s){ if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); }
public V get()throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 如果任务还处于COMPLETING或之前的状态,则进入阻塞等待 s = awaitDone(false, 0L); return report(s); // 返回结果 }