参考文献

AbstractExecutorService源码解析

submit方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
// 通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

invokeAny方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
* the main mechanics of invokeAny.
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
// 提交任务为空,抛出空指针异常
if (tasks == null)
throw new NullPointerException();
// 记录待执行的任务的剩余数量
int ntasks = tasks.size();
// 任务集合中的数据为空,抛出非法参数异常
if (ntasks == 0)
throw new IllegalArgumentException();
// 以当前实例对象作为参数构建ExecutorCompletionService对象
// ExecutorCompletionService负责执行任务,后面调用poll返回第一个执行结果
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);

// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.

try {
// 记录可能抛出的执行异常
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
// 初始化超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();

// 提交任务,并将返回的结果数据添加到futures集合中
// 提交一个任务主要是确保在进入循环之前开始一个任务
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
// 记录正在执行的任务数量
int active = 1;

for (;;) {
// 从完成任务的BlockingQueue队列中获取并移除下一个将要完成的任务的结果
// 如果BlockingQueue队列中的数据为空,则返回null
// 这里的poll()方法是非阻塞方法
Future<T> f = ecs.poll();
// 获取的结果为空
if (f == null) {
// 集合中仍有未执行的任务数量
if (ntasks > 0) {
// 未执行的任务数量减一
--ntasks;
// 提交完成并将结果添加到futures集合中
futures.add(ecs.submit(it.next()));
// 正在执行的任务数量加1
++active;
}
// 所有任务执行完成,并返回结果数据,则退出循环
// 之所以处理active为0的情况,是因poll()方法是非阻塞方法,可能导致未返回结果时active为0
else if (active == 0)
break;
// 如果timed为true,则执行获取结果数据时设置超时时间,也就是超时获取结果表示
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
// 没有设置超时,并且所有任务都被提交了,则一直阻塞,直到返回一个执行结果
else
f = ecs.take();
}
// 获取到执行结果,则将正在执行的任务减一,从Future中获取结果并返回
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}

if (ee == null)
ee = new ExecutionException();
throw ee;

} finally {
// 如果从所有执行的任务中获取到一个结果数据,则取消所有执行的任务,不再向下执行
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

invokeAll方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
// 表示所有任务是否完成
boolean done = false;
try {
// 遍历所有任务
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
// 将结果数据添加到futures集合中
futures.add(f);
// 执行任务
execute(f);
}
// 遍历结果数据集合
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
// 任务没有完成
if (!f.isDone()) {
try {
// 阻塞等待任务完成并返回结果
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
// 任务完成(不管正常结束还是异常完成)
done = true;
// 返回结果数据集合
return futures;
} finally {
// 如果发生中断异常InterruptedException则取消已经提交的任务
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));

final long deadline = System.nanoTime() + nanos;
final int size = futures.size();

// 遍历所有任务
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
// 在添加执行任务时超时判断,如果超时则立即返回futures集合
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
// 在添加执行任务时进行超时判断,如果超时,则立刻返回 futures 集合
return futures;
}

// 遍历所有任务
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
// 对结果进行判断时进行超时判断
if (nanos <= 0L)
// 每次对结果数据进行判断时添加了超时处理逻辑
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
// 重置任务的超时时间
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

ThreadPoolExecutor源码解析

向线程池提交任务

1
2
3
4
5
6
7
8
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象.
// 通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

使用ThreadPoolExecutor.execute()方法来执行提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 执行任务
public void execute(Runnable command){};

// 提交任务task,用返回值Future获取任务执行结果
<T> Future<T> submit(Callable<T> task);

// 提交tasks中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 提交tasks中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void execute(Runnable command) {
// command为null,抛出NullPointerException
if (command == null)
throw new NullPointerException();
// 获取ctl
int c = ctl.get();
// 判断当前启用的线程数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 为该任务分配线程
if (addWorker(command, true))
// 分配成功就返回
return;
// 分配失败再次获取ctl
c = ctl.get();
}
// 分配和信息线程失败以后
// 如果池状态为RUNNING并且插入到任务队列成功
if (isRunning(c) && workQueue.offer(command)) {
// 双重检测,可能在添加后线程池状态变为了非RUNNING
int recheck = ctl.get();
// 如果池状态为非RUNNING,则不会执行新来的任务
// 将该任务从阻塞队列中移除
if (! isRunning(recheck) && remove(command))
// 调用拒绝策略,拒绝该任务的执行
reject(command);
// 如果没有正在运行的线程
else if (workerCountOf(recheck) == 0)
// 就创建新线程来执行该任务
addWorker(null, false);
}// 阻塞队列已满,尝试创建新的线程,如果超过maximumPoolSize,执行handler.rejectExecution()
else if (!addWorker(command, false))
reject(command);
}
  • addWorker()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
    int c = ctl.get();
    // 拿到ctl中高三位的值
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // 如果当前线程池状态处于关闭状态(即SHUTDOWN及其以上状态),并且以下三个条件之一成立:
    // 1. 当前线程池状态为STOP及其以上状态;
    // 2. 第一个任务不为空;
    // 3. 工作队列为空;
    // 则创建新线程失败
    if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
    firstTask == null &&
    ! workQueue.isEmpty()))
    // 创建新线程失败
    return false;

    for (;;) {
    // 获得当前工作线程数
    int wc = workerCountOf(c);

    // 参数中 core 为true
    // CAPACITY 为 1 << COUNT_BITS-1,一般不会超过
    // 如果工作线程数大于了核心线程数,则创建失败
    if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;
    // 通过CAS操作改变c的值
    if (compareAndIncrementWorkerCount(c))
    // 更改成功就跳出多重循环,且不再运行循环
    break retry;
    // 更改失败,重新获取ctl的值
    c = ctl.get(); // Re-read ctl
    if (runStateOf(c) != rs)
    // 跳出多重循环,且重新进入循环
    continue retry;
    // else CAS failed due to workerCount change; retry inner loop
    }
    }

    // 用于标记work中的任务是否成功执行
    boolean workerStarted = false;
    // 用于标记worker是否成功加入了线程池中
    boolean workerAdded = false;
    Worker w = null;
    try {
    // 创建新线程来执行任务
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
    final ReentrantLock mainLock = this.mainLock;
    // 加锁
    mainLock.lock();
    try {
    // Recheck while holding lock.
    // Back out on ThreadFactory failure or if
    // shut down before lock acquired.
    // 加锁的同时再次检测
    // 避免在释放锁之前调用了shut down
    int rs = runStateOf(ctl.get());

    if (rs < SHUTDOWN ||
    (rs == SHUTDOWN && firstTask == null)) {
    if (t.isAlive()) // precheck that t is startable
    throw new IllegalThreadStateException();
    // 将线程添加到线程池中
    workers.add(w);
    int s = workers.size();
    if (s > largestPoolSize)
    largestPoolSize = s;
    // 添加成功标志位变为true
    workerAdded = true;
    }
    } finally {
    mainLock.unlock();
    }
    // 如果worker成功加入了线程池,就执行其中的任务
    if (workerAdded) {
    t.start();
    // 启动成功
    workerStarted = true;
    }
    }
    } finally {
    // 如果执行失败
    if (! workerStarted)
    // 调用添加失败的函数
    addWorkerFailed(w);
    }
    return workerStarted;
    }

关闭线程池

  • 执行shutdown()方法会执行以下操作:

    1. 不再接受新的任务,新提交的任务将被拒绝(rejected)
    2. 等待所有已经提交的任务执行完成.
    3. 中断所有正在执行的任务.
    4. 释放线程池中所有的资源.
  • shutdown()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    /**
    * 将线程池的状态改为 SHUTDOWN
    * 不再接受新任务,但是会将阻塞队列中的任务执行完
    */
    public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    checkShutdownAccess();

    // 修改线程池状态为 SHUTDOWN
    advanceRunState(SHUTDOWN);

    // 中断空闲线程(没有执行任务的线程)
    // Idle:空闲的
    interruptIdleWorkers();
    onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
    mainLock.unlock();
    }
    // 尝试终结,不一定成功
    tryTerminate();
    }

    private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    // 遍历所有worker
    for (Worker w : workers) {
    Thread t = w.thread;
    // 若当前t没有被中断,先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它
    // 注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能
    // 它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false
    if (!t.isInterrupted() && w.tryLock()) {
    try {
    t.interrupt();
    } catch (SecurityException ignore) {
    } finally {
    w.unlock();
    }
    }
    if (onlyOne)
    break;
    }
    } finally {
    mainLock.unlock();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    final void tryTerminate() {
    for (;;) {
    int c = ctl.get();
    // 终结失败的条件
    // 线程池状态为RUNNING
    // 线程池状态为 RUNNING SHUTDOWN STOP (状态值大于TIDYING)
    // 线程池状态为SHUTDOWN,但阻塞队列中还有任务等待执行
    if (isRunning(c) ||
    runStateAtLeast(c, TIDYING) ||
    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
    return;

    // 如果活跃线程数不为0
    if (workerCountOf(c) != 0) { // Eligible to terminate
    // 中断空闲线程
    interruptIdleWorkers(ONLY_ONE);
    return;
    }

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    // 处于可以终结的状态
    // 通过CAS将线程池状态改为TIDYING
    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    try {
    terminated();
    } finally {
    // 通过CAS将线程池状态改为TERMINATED
    ctl.set(ctlOf(TERMINATED, 0));
    termination.signalAll();
    }
    return;
    }
    } finally {
    mainLock.unlock();
    }
    // else retry on failed CAS
    }
    }
  • shutdownNow

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    /**
    * 将线程池的状态改为 STOP
    * 不再接受新任务,也不会在执行阻塞队列中的任务
    * 会将阻塞队列中未执行的任务返回给调用者
    */
    public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    checkShutdownAccess();

    // 修改状态为STOP,不执行任何任务
    advanceRunState(STOP);

    // 中断所有线程
    interruptWorkers();

    // 将未执行的任务从队列中移除,然后返回给调用者
    tasks = drainQueue();
    } finally {
    mainLock.unlock();
    }
    // 尝试终结,一定会成功,因为阻塞队列为空了
    tryTerminate();
    return tasks;
    }

任务调度线程池

  • 在"任务调用线程池"功能加入之前,可以使用java.util.Timer来实现定时功能,Timer的优点在于简单易用,但由于所有任务都是有一个同一个线程来调度,因此所有任务都是串行执行的,同一时间只有一个任务执行,前一个任务的延迟或异常将会影响到之后的任务;
Timer实现方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class TimerTest {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask timerTask1 = new TimerTask() {
@Override
public void run() {
log.info("task 1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
TimerTask timerTask2 = new TimerTask() {
@Override
public void run() {
log.info("task 2");
}
};
// 使用timer添加两个任务,希望它们都是1s执行
// 但由于timer内只有一个线程来顺序执行队列中任务,因此[任务1]的延迟,影响了[任务2]的执行
timer.schedule(timerTask1,1000);
timer.schedule(timerTask2,1000);
}
}

21:01:24.204 [Timer-0] INFO com.holelin.sundry.test.thread.TimerTest - task 1
21:01:25.216 [Timer-0] INFO com.holelin.sundry.test.thread.TimerTest - task 2
ScheduledExecutorService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在1s后执行
executor.schedule(() -> {
log.info("任务1,执行时间:{}", new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
log.info("任务2,执行时间:{}", new Date());
}, 1000, TimeUnit.MILLISECONDS);

21:00:09.547 [pool-1-thread-1] INFO com.holelin.sundry.test.thread.TimerTest - 任务1,执行时间:Tue Jun 22 21:00:09 CST 2021
21:00:09.547 [pool-1-thread-2] INFO com.holelin.sundry.test.thread.TimerTest - 任务2,执行时间:Tue Jun 22 21:00:09 CST 2021

// public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
// 上个任务结束-->延迟-->下一个任务开始
// public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
  • 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队,任务执行完毕,这些线程也不会释放,用来执行延迟或反复执行的任务
ScheduledFutureTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {

// 当两个任务有相同的延迟时间时,按照 FIFO 的顺序入队.sequenceNumber 就是为相同延时任务提供的顺序编号
/** Sequence number to break ties FIFO */
private final long sequenceNumber;

// 任务可以执行的时间,纳秒级 通过triggerTime方法计算得出
/** The time the task is enabled to execute in nanoTime units */
private long time;

// 重复任务的执行周期时间,纳秒级 正数表示固定速率执行(为scheduleAtFixedRate提供服务),负数表示固定延迟执行(为scheduleWithFixedDelay提供服务),0表示不重复任务
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period;

// 重新入队的任务 通过reExecutePeriodic方法入队重新排序
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;

// 延迟队列的索引,以支持更快的取消操作
/**
* Index into delay queue, to support faster cancellation.
*/
int heapIndex;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void run() {
// 是否是周期任务
boolean periodic = isPeriodic();
// 当前状态是否可以执行
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
// 不是周期任务,直接执行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次运行时间
setNextRunTime();
// 重排序一个周期任务
reExecutePeriodic(outerTask);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
/**
* Sets the next time to run for a periodic task.
*/
private void setNextRunTime() {
long p = period;
// 固定速率执行,scheduleAtFixedRate
if (p > 0)
time += p;
else
// 固定延迟执行,scheduleWithFixedDelay
time = triggerTime(-p);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Requeues a periodic task unless current run state precludes it.
* Same idea as delayedExecute except drops task rather than rejecting.
*
* @param task the task
*/
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 任务入队
super.getQueue().add(task);
// 重新检查run-after-shutdown参数,如果不能继续运行就移除队列任务,并取消任务的执行
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 启动一个新的线程等待任务
ensurePrestart();
}
}

正确处理执行任务异常

  • 主动捕获异常

  • Future

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Slf4j
    public class ThreadExceptionTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService pool = Executors.newFixedThreadPool(1);
    Future<Boolean> task = pool.submit(() -> {
    log.info("task1");
    int i = 1 / 0;
    return true;
    });
    log.info("result:{}", task.get());
    }
    }

ScheduledThreadPoolExecutor源码解析

  • ScheduledThreadPoolExecutor继承自 ThreadPoolExecutor,为任务提供延迟或周期执行,属于线程池的一种和ThreadPoolExecutor 相比,它还具有以下几种特性:
    • 使用专门的任务类型—ScheduledFutureTask 来执行周期任务,也可以接收不需要时间调度的任务(这些任务通过 ExecutorService 来执行).
    • 使用专门的存储队列—DelayedWorkQueue 来存储任务,DelayedWorkQueue 是无界延迟队列DelayQueue 的一种.相比ThreadPoolExecutor也简化了执行机制(delayedExecute方法,后面单独分析).
    • 支持可选的run-after-shutdown参数,在池被关闭(shutdown)之后支持可选的逻辑来决定是否继续运行周期或延迟任务.并且当任务(重新)提交操作与 shutdown 操作重叠时,复查逻辑也不相同.

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {

/*
* This class specializes ThreadPoolExecutor implementation by
*
* 1. Using a custom task type, ScheduledFutureTask for
* tasks, even those that don't require scheduling (i.e.,
* those submitted using ExecutorService execute, not
* ScheduledExecutorService methods) which are treated as
* delayed tasks with a delay of zero.
*
* 2. Using a custom queue (DelayedWorkQueue), a variant of
* unbounded DelayQueue. The lack of capacity constraint and
* the fact that corePoolSize and maximumPoolSize are
* effectively identical simplifies some execution mechanics
* (see delayedExecute) compared to ThreadPoolExecutor.
*
* 3. Supporting optional run-after-shutdown parameters, which
* leads to overrides of shutdown methods to remove and cancel
* tasks that should NOT be run after shutdown, as well as
* different recheck logic when task (re)submission overlaps
* with a shutdown.
*
* 4. Task decoration methods to allow interception and
* instrumentation, which are needed because subclasses cannot
* otherwise override submit methods to get this effect. These
* don't have any impact on pool control logic though.
*/

/**
* continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown是
* ScheduledThreadPoolExecutor 定义的 run-after-shutdown 参数,用来控制池关闭之后的任务执行逻辑
*/
// 关闭后继续执行已经存在的周期任务
/**
* False if should cancel/suppress periodic tasks on shutdown.
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;

// 关闭后继续执行已经存在的延时任务
/**
* False if should cancel non-periodic tasks on shutdown.
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

/**
* 用来控制任务取消后是否从队列中移除.当一个已经提交的周期或延迟任务在运行之前被取消,那么它之后将不会运行.
* 默认配置下,这种已经取消的任务在届期之前不会被移除.通过这种机制,可以方便检查和监控线程池状态,但也可能导致已经取消的任务无限滞留.
* 为了避免这种情况的发生,我们可以通过setRemoveOnCancelPolicy方法设置移除策略,把参数removeOnCancel设为true可以在任务取消后立即从队列中移除.
*/
/**
* True if ScheduledFutureTask.cancel should remove from queue
*/
private volatile boolean removeOnCancel = false;

// 是为相同延时的任务提供的顺序编号,保证任务之间的 FIFO 顺序.与 ScheduledFutureTask 内部的sequenceNumber参数作用一致
/**
* Sequence number to break scheduling ties, and in turn to
* guarantee FIFO order among tied entries.
*/
private static final AtomicLong sequencer = new AtomicLong();

方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* 延时delay时间来执行command任务,只执行一次
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}

/**
* 延时delay时间来执行callable任务,只执行一次
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
// 构造ScheduledFutureTask任务
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
// 任务执行主方法
delayedExecute(t);
return t;
}

/**
* 延时initialDelay时间首次执行command任务,之后每隔period时间执行一次
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 构建RunnableScheduledFuture任务类型
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),// 计算任务的延迟时间
unit.toNanos(period));// 计算任务的执行周期
// 执行用户自定义逻辑
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 赋值给outerTask,准备重新入队等待下一次执行
sft.outerTask = t;
// 执行任务
delayedExecute(t);
return t;
}

/**
* 延时initialDelay时间首次执行command任务,之后每延时delay时间执行一次
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
  • onShutdown
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
// 获取run-after-shutdown参数
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!keepDelayed && !keepPeriodic) {
// 池关闭后不保留任务
// 依次取消任务
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
// 清除等待队列
q.clear();
}
else {
// 池关闭后保留任务
// Traverse snapshot to avoid iterator exceptions
// 遍历快照以避免迭代器异常
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
// 如果任务已经取消,移除队列中的任务
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();
}