参考文献

FutrueTask

  • FutureTask为Future提供了基础实现,如获取任务执行结果(get)和取消任务(cancel)等.如果任务尚未完成,获取任务执行结果时将会阻塞.一旦执行结束,任务就不能被重启或取消(除非使用runAndReset执行计算).FutureTask 常用来封装 Callable 和 Runnable,也可以作为一个任务提交到线程池中执行.除了作为一个独立的类之外,此类也提供了一些功能性函数供我们创建自定义 task 类使用.FutureTask 的线程安全由CAS来保证.

Callable接口

1
2
3
4
5
6
7
8
9
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

Future接口

  • Future接口代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public interface Future<V> {

/**
*
* 用来取消异步任务的执行.如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false.如果任务还没有被执行,则会返回true并且异步任务不会被执行.
* 如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,
* 若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程.
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* 判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false.
*/
boolean isCancelled();

/**
* 判断任务是否已经完成,如果完成则返回true,否则返回false.
* 需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true
*/
boolean isDone();

/**
*
* 获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成.
* 如果任务被取消则会抛出CancellationException异常,如果任务执行过程发生异常则会抛出ExecutionException异常,
* 如果阻塞等待过程中被中断则会抛出InterruptedException异常
*/
V get() throws InterruptedException, ExecutionException;

/**
* 带超时时间的get()版本,如果阻塞等待过程中超时则会抛出TimeoutException异常.
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
// 表示是个新的任务或者还没被执行完的任务.这是初始状态
private static final int NEW = 0;
// 任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,
// 如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING.但是这个状态会时间会比较短,属于中间状态.
private static final int COMPLETING = 1;
// 任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL.这是一个最终态
private static final int NORMAL = 2;
// 任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL.这是一个最终态
private static final int EXCEPTIONAL = 3;
// 任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态.这是一个最终态
private static final int CANCELLED = 4;
// 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING.这是一个中间状态
private static final int INTERRUPTING = 5;
// 调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED.这是一个最终态.
// 有一点需要注意的是,所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)
private static final int INTERRUPTED = 6;

// 内部持有的Callable任务,运行完毕置空
/** The underlying callable; nulled out after running */
private Callable<V> callable;
// 从get()中返回的结果或抛出的异常
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
// 运行Callable的线程
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
// 使用Treiber栈保存等待线程
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

获取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果 state 还没到 set outcome 结果的时候,则调用 awaitDone() 方法阻塞自己
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 返回结果
return report(s);
}

/**
* get 方法支持超时限制,如果没有传入超时时间,则接受的参数是 false 和 0L
* 有等待就会有队列排队或者可响应中断,从方法签名上看有 InterruptedException,说明该方法这是可以被中断的
*
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 计算等待截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 如果当前线程被中断,如果是,则在等待对立中删除该节点,并抛出 InterruptedException
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
// 状态大于 COMPLETING 说明已经达到某个最终状态(正常结束/异常结束/取消)
// 把 thread 只为空,并返回结果
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 如果是COMPLETING 状态(中间状态),表示任务已结束,但 outcome 赋值还没结束,这时主动让出执行权,让其他线程优先执行(只是发出这个信号,至于是否别的线程执行一定会执行可是不一定的)
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 等待节点为空
else if (q == null) // TODO 第一轮for循环
// 将当前线程构造节点
q = new WaitNode();
// 如果还没有入队列,则把当前节点加入waiters首节点并替换原来waiters
else if (!queued) // TODO 第二轮for循环
// 这个时候会把第一轮循环中生成的节点的 next 指针指向waiters,然后CAS的把节点q 替换waiters,
// 也就是把新生成的节点添加到waiters 中的首节点。如果替换成功,queued=true。第二轮循环结束
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果设置超时时间
else if (timed) { // TODO 第三轮for循环
nanos = deadline - System.nanoTime();
// 时间到,则不再等待结果
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞等待特定时间
LockSupport.parkNanos(this, nanos);
}
else
// 挂起当前线程,直到被其他线程唤醒
LockSupport.park(this);
}
}

private void removeWaiter(WaitNode node) {
if (node != null) {
// 首先置空线程
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
// 依次遍历查找
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

取消任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
// 如果当前状态不为NEW,则根据参数mayInterruptIfRunning决定是否在任务运行中也可以中断.中断操作完成后,调用finishCompletion移除并唤醒所有等待线程
try { // in case call to interrupt throws exception
// 可以在运行时中断
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 移除并唤醒所有等待线程
finishCompletion();
}
return true;
}