前言
Future
接口和实现Future
接口的FutureTask
类,代表异步计算的结果. 简单点说就是实现有返回结果的task
, 实现Runnable
接口的线程没有提供获得线程返回的结果, 而FutureTask
实现了异步获得计算结果的一种方式, 也就是说可以先让一个线程去执行该task
后自己去干其他的事情,等到一段时间后可以来获取该task
的执行结果.
本文源码: 本文源码地址
例子
先使用一个例子简单看看
FutureTask
的使用.
package com.futuretask;import java.util.concurrent.Callable;public class FutureTaskTest02 { public static void main(String[] args) throws Exception { FutureTask<String> futureTask = new FutureTask< >(new Callable<String>() { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName() + " starts to run."); Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + " wakes up."); return "futurecall"; } }); Thread thread = new Thread(futureTask); thread.start(); System.out.println(Thread.currentThread().getName() + " finished to start thread."); System.out.println(Thread.currentThread().getName() + "->" + futureTask.get()); } }
由代码中可以看到初始化一个
Thread
时, 传入一个FutureTask
对象, 正常创建一个线程, 要传入一个Runnable
对象, 其实FutureTask
是Runnable
的一个子类. 所以就好理解了, 另外还注意的是FutureTask
对象传入了一个Callable
实例, 暂时可以理解call
方法为Runnable
里面的run
方法,是线程要执行的实体. 接着启动线程后在主线程中可以获得线程
中FutureTask
的结果.
输出结果如下: 可以看到
FutureTask
中可以得到线程执行结束后得到的结果.
main finished to start thread. Thread-0 starts to run. Thread-0 wakes up. main->futurecall
类结构
futureTask.png
可以看到
FutureTask
实现了RunnableFuture
接口, 然而RunnableFuture
接口继承了Runnable
接口和Future
接口. 同时类FutureTask
中使用了Callable
对象,Callable
接口定义了call
由用户实现并且注入到FutureTask
中.
由此可以猜测上例中
thread
中真正调用的是FutureTask
的run
方法, 而run
方法中实际调用了Callable
的call
方法并返回值, 关于取消获取返回值之类的方法都是FutureTask
定义了一些逻辑来实现了Future
的所有接口方法.
源码
接下来将分析一个
FutureTask
类.
属性
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
之前有说
FutureTask
是设计了一些逻辑来实现Future
接口中的方法. 这些逻辑的基本线就是基于state
,state
是表示当前线程执行该任务的一些状态值. 状态值就是代码中对应的那些值, 他们的状态值转换只有下面这四种可能性.
NEW -> COMPLETING -> NORMALNEW -> COMPLETING -> EXCEPTIONALNEW -> CANCELLEDNEW -> INTERRUPTING -> INTERRUPTED
接下来基于这四种可能性, 我们通过源码和例子共同来测试和查看, 弄明白每种可能性执行的代码逻辑. 最后加以总结.
构造方法
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
这个就不多说了, 可以看到初始状态为
NEW
.
run 方法
接着看要执行的
run
方法.
/** * 最终运行的方法 */ public void run() { // 如果状态值不为NEW 表示已经有线程运行过该task了 因此返回 // 如果状态值为NEW 则设置RUNNER为当前线程 如果设置不成功也返回 if (state != NEW || // 1 !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; // 进入到这里 表明执行该task的是当前线程已经被设置到RUNNER变量中并且状态值state为NEW try { Callable<V> c = callable; if (c != null && state == NEW) { // 2 /** * result 接收callable的返回值 * ran 表示callable方法是否正确执行完成 */ V result; boolean ran; try { // 调用callable的方法call 并把结果放到result中 result = c.call(); ran = true; } catch (Throwable ex) { // 3 // call()方法出现异常执行的操作 result = null; ran = false; setException(ex); } // call()正确执行完执行的操作 if (ran) // 4 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() /** * 设置执行线程为null 不需要用CAS是因为前面的CAS会挡住其他的线程进入 */ runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts /** * 如果s >= INTERRUPTING 则调用handlePossibleCancellationInterrupt(s)方法 * 那什么时候会 s >= INTERRUPTING呢? * -> 调用cancel(true)并且没有在setException(ex)和set(result)发生前 */ int s = state; if (s >= INTERRUPTING) // 5 handlePossibleCancellationInterrupt(s); } }
按代码中顺序:
1. 如果状态值不为NEW
, 表明有线程已经在执行这个run
方法, 因此直接返回. 如果状态值为NEW
则把当前线程设置到runner
变量中,由于是多线程操作,为了保持线程间可见性,runner
变量是volatile
并且用CAS
设置.
2. 进入到第2
步, 为什么需要进行判断if (c != null && state == NEW)
呢? 这是因为如果刚刚结束完第1
步正在进入第2
步的过程中,别的线程启动了cancel(false/true)
方法(该方法会在后面分析,此时只需要知道这个即可), 都会导致state
不为NEW
的. 接着就开始执行callable
的call
方法, 用result
接受返回值, 用ran
表示call
方法是否完整顺利的执行.
3. 进入到第3
步, 表明call
方法出了异常没有正常顺利的执行完, 此时设置result
为null
,ran
为false
, 并且通过setException(ex)
进入到异常结束状态. 看如下代码:
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
可以看到会把异常对象传给
outcome
, 并且设置状态. 由此可见状态值是NEW -> COMPLETING -> EXCEPTIONAL
并且COMPLETING
是一个中间状态.finishCompletion()
是做一些收尾工作,会放到后面分析.
4. 如果
call
方法正常顺利执行完, 则会调用set(result)
设置正常结束状态. 看如下代码段:
protected void set(V v) { //System.out.println(" set after state:" + state); if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //System.out.println(" set before state:" + state); UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
可以看到
outcome
是保存最终的结果包括返回值或者异常对象. 由此可见状态值是NEW -> COMPLETING -> NORMAL
并且COMPLETING
是一个中间状态.finishCompletion()
会放到后面分析.
5. 这个是
finally
执行片段, 这段代码会把runner
设置为null
, 这里不需要用CAS
是因为通过上面的片段,只有放到runner
的那个线程才可以执行到try...finally...
的片段. 上面也有提到过如果在第1
步和第2
步之间发生了cancel
方法. 为了可以更加清晰的明白此代码段,看一下cancel
方法的代码及其作用:
作者:nicktming
链接:https://www.jianshu.com/p/92d01d21ce0e