参考文献

线程池

合理使用线程池的好处

  • 降低资源消耗.

    • 通过重复利用已经创建的线程降低线程创建的和销毁造成的消耗.例如,工作线程Woker会无线循环获取阻塞队列中的任务来执行.
  • 提高响应速度.

    • 当任务到达时,任务可以不需要等到线程创建就能立即执行.
  • 提高线程的可管理性.

    • 线程是稀缺资源,Java的线程池可以对线程资源进行统一分配、调优和监控.

实现线程池

  • 一般的线程池主要分为以下4个组成部分

    • 线程池管理器:用于创建并管理线程池
    • 工作线程:线程池中的线程
    • 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
    • 任务队列:用于存放待处理的任务,提供一种缓冲机制
  • 自定义拒绝策略接口

    1
    2
    3
    4
    @FunctionalInterface
    public interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
    }
  • 自定义任务队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    package com.holelin.sundry.test.thread;

    import lombok.extern.slf4j.Slf4j;

    import java.util.ArrayDeque;
    import java.util.Deque;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;

    @Slf4j
    public class BlockingQueue<T> {
    /**
    * 任务队列
    */
    private Deque<T> queue = new ArrayDeque<>();
    /**
    * 锁
    */
    private ReentrantLock lock = new ReentrantLock();
    /**
    * 生产者条件变量
    */
    private Condition fullWaitSet = lock.newCondition();
    /**
    * 消费者条件变量
    */
    private Condition emptyWaitSet = lock.newCondition();

    /**
    * 容量
    */
    private int capacity;

    /**
    * 带超时阻塞获取
    *
    * @param timeout
    * @param unit
    * @return
    */
    public T poll(long timeout, TimeUnit unit) {
    lock.lock();
    try {
    long nanos = unit.toNanos(timeout);
    while (queue.isEmpty()) {
    try {
    if (nanos <= 0) {
    return null;
    }
    emptyWaitSet.awaitNanos(nanos);
    } catch (InterruptedException e) {

    }
    }
    T t = queue.removeFirst();
    fullWaitSet.signal();
    return t;
    } finally {
    lock.unlock();
    }
    }

    /**
    * 阻塞获取
    *
    * @return
    */
    public T take() {
    lock.lock();
    try {
    while (queue.isEmpty()) {
    try {
    emptyWaitSet.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    T t = queue.removeFirst();
    fullWaitSet.signal();
    return t;
    } finally {
    lock.unlock();
    }
    }

    public void put(T task) {
    lock.lock();
    try {
    while (queue.size() == capacity) {
    log.info("等待加入任务队列:{}", task);
    try {
    fullWaitSet.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    log.info("加入任务队列:{}", task);
    queue.addLast(task);
    emptyWaitSet.signal();
    } finally {
    lock.unlock();
    }
    }

    public boolean offer(T task, long timeout, TimeUnit unit) {
    lock.lock();
    try {
    long nanos = unit.toNanos(timeout);
    while (queue.size() == capacity) {
    if (nanos <= 0) {
    return false;
    }
    log.info("等待加入任务队列:{}", task);
    try {
    nanos = fullWaitSet.awaitNanos(nanos);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    log.info("加入任务队列:{}", task);
    queue.addLast(task);
    emptyWaitSet.signal();
    return true;
    } finally {
    lock.unlock();
    }
    }

    public int size() {
    lock.lock();
    try {
    return queue.size();
    } finally {
    lock.unlock();
    }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
    lock.lock();
    try {
    if (queue.size()==capacity){
    rejectPolicy.reject(this,task);
    }else {
    log.info("加入任务队列:{}", task);
    queue.addLast(task);
    emptyWaitSet.signal();
    }
    } finally {
    lock.unlock();
    }
    }
    }
  • 自定义线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    package com.holelin.sundry.test.thread;

    import lombok.extern.slf4j.Slf4j;

    import java.util.HashSet;
    import java.util.Objects;
    import java.util.concurrent.TimeUnit;

    @Slf4j
    public class ThreadPool {
    /**
    * 任务队列
    */
    private BlockingQueue<Runnable> taskQueue;
    /**
    * 线程集合
    */
    private HashSet<Worker> workers = new HashSet<>();
    /**
    * 核心线程数
    */
    private int coreSize;
    /**
    * 获取任务时的超时时间
    */
    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy<Runnable> rejectPolicy;

    public void execute(Runnable task) {
    synchronized (workers) {
    if (workers.size() < coreSize) {
    Worker worker = new Worker(task);
    log.info("新增worker{},{}", worker, task);
    workers.add(worker);
    worker.start();
    } else {
    // taskQueue.put(task);
    // 1. 死等
    // 2. 带超时时间等待
    // 3. 让调用者放弃任务执行
    // 4. 让调用者抛出异常
    // 5. 让调用者自己执行任务
    taskQueue.tryPut(rejectPolicy, task);
    }
    }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
    this.workers = workers;
    this.coreSize = coreSize;
    this.timeout = timeout;
    this.timeUnit = timeUnit;
    this.taskQueue = new BlockingQueue<>(queueCapacity);
    this.rejectPolicy = rejectPolicy;
    }

    class Worker extends Thread {
    private Runnable task;

    public Worker(Runnable task) {
    this.task = task;
    }

    @Override
    public void run() {
    /**
    * 执行任务
    * 1. 当task不为空,执行任务
    * 2. 当task执行完毕,再接着从任务队列中获取任务并执行
    */
    while (Objects.nonNull(task) || Objects.nonNull(task = taskQueue.poll(timeout, timeUnit))) {
    try {
    log.info("正在执行...{}", task);
    task.run();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    task = null;
    }
    }
    synchronized (workers) {
    log.info("worker被移除{}", this);
    workers.remove(this);
    }
    }
    }
    }

  • 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    @Slf4j
    public class ThreadPoolTest {
    public static void main(String[] args) {
    ThreadPool threadPool = new ThreadPool(1, 100, TimeUnit.MILLISECONDS, 1, ((queue, task) -> {
    // 1. 死等
    // queue.put(task);
    // 2. 带超时时间等待
    // queue.offer(task, 1500, TimeUnit.MILLISECONDS);
    // 3. 让调用者放弃任务执行
    // log.info("放弃执行:{}", task);
    // 4. 让调用者抛出异常
    // throw new RuntimeException("任务执行失败" + task);
    // 5. 让调用者自己执行任务
    task.run();
    }));
    for (int i = 0; i < 4; i++) {
    int j = i;
    threadPool.execute(() -> {
    try {
    Thread.sleep(1000L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("{}", j);
    });
    }
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public class ThreadPoolExecutorTest {
    private static AtomicInteger threadId = new AtomicInteger(0);
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAXIMUM_POOL_SIZE = 10;
    private static final long KEEP_ALIVE_TIME = 10;

    public static void main(String[] args) {
    // 手动创建线程池
    // 创建有界阻塞队列
    ArrayBlockingQueue<Runnable> runnable = new ArrayBlockingQueue<Runnable>(10);
    // 创建线程工厂
    ThreadFactory threadFactory = new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
    Thread thread = new Thread(r, "working_thread_" + threadId.getAndIncrement());
    return thread;
    }
    };

    // 手动创建线程池
    // 拒绝策略采用默认策略
    ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, runnable, threadFactory);

    for (int i = 0; i < 20; i++) {
    executor.execute(new Runnable() {
    @Override
    public void run() {
    System.out.println(Thread.currentThread());
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    }
    }
    }

线程池继承体系

1
ThreadPoolExecutor --> AbstractExecutorService --> ExecutorService --> Excutor
  • 顶级接口Executor提供了一种方式,解耦任务的提交和执行,只定义了一个 execute(Runnable command) 方法用来提交任务,至于具体任务怎么执行则交给他的实现者去自定义实现.
  • ExecutorService 接口继承 Executor,且扩展了生命周期管理的方法、返回 Futrue 的方法、批量提交任务的方法.
  • AbstractExecutorService 抽象类继承 ExecutorService 接口,对 ExecutorService 相关方法提供了默认实现,用 RunnableFuture 的实现类 FutureTask 包装 Runnable 任务,交给 execute() 方法执行,然后可以从该 FutureTask 阻塞获取执行结果,并且对批量任务的提交做了编排.
  • ThreadPoolExecutor 继承AbstractExecutorService,采用池化思想管理一定数量的线程来调度执行提交的任务,且定义了一套线程池的生命周期状态,用一个 ctl 变量来同时保存当前池状态(高3位)和当前池线程数(低29位).

线程池的工作流程

  • 一个新的任务到线程池时,线程池的处理流程如下:

    • 当一个任务通过submit或者execute方法提交到线程池的时候,如果当前池中线程数(包括闲置线程)小于coolPoolSize,则创建一个线程执行该任务.

    • 如果当前线程池中线程数已经达到coolPoolSize,则将任务放入等待队列.

    • 如果任务不能入队,说明等待队列已满,若当前池中线程数小于maximumPoolSize,则创建一个临时线程(非核心线程)执行该任务.

    • 如果当前池中线程数已经等于maximumPoolSize,此时无法执行该任务,根据拒绝执行策略处理.

注意:当池中线程数大于coolPoolSize,超过keepAliveTime时间的闲置线程会被回收掉.回收的是非核心线程,核心线程一般是不会回收的.如果设置allowCoreThreadTimeOut(true),则核心线程在闲置keepAliveTime时间后也会被回收.

img

img

img

  • 上图来源于https://dayarch.top/p/why-we-need-to-use-threadpool.html

工作线程(Worker)

  • 线程池在创建线程时,会将线程封装成工作线程Woker.Woker在执行完任务后,不是立即销毁而是循环获取阻塞队列里的任务来执行.

线程池的创建(7个参数)

1
2
3
4
5
6
7
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
序号 参数名称 参数解释 春运形象说明
1 corePoolSize 表示常驻核心线程数,如果大于0,即使本地任务执行完也不会被销毁 日常固定的列车数辆(不管是不是春运,都要有固定这些车次运行)
2 maximumPoolSize 表示线程池能够容纳可同时执行的最大线程数 春运客流量大,临时加车,加车后,总列车次数不能超过这个最大值,否则就会出现调度不开等问题
3 keepAliveTime 表示线程池中线程空闲的时间,当空闲时间达到该值时,线程会被销毁,只剩下 corePoolSize 个线程位置 春运压力过后,临时的加车(如果空闲时间超过keepAliveTime)就会被撤掉,只保留日常固定的列车车次数量用于日常运营
4 unit keepAliveTime 的时间单位,最终都会转换成【纳秒】,因为CPU的执行速度杠杠滴 keepAliveTime 的单位,春运以【天】为计算单位
5 workQueue 当请求的线程数大于 maximumPoolSize 时,线程进入该阻塞队列 春运压力异常大,即便加车后(达到maximumPoolSize)也不能满足要求,所有乘坐请求都会进入该阻塞队列中排队
6 threadFactory 顾名思义,线程工厂,用来生产一组相同任务的线程,同时也可以通过它增加前缀名,虚拟机栈分析时更清晰 比如(北京——上海)就属于该段列车所有前缀,表明列车运输职责
7 handler 执行拒绝策略,当 workQueue 达到上限,就要通过这个来处理,比如拒绝,丢弃等,这是一种限流的保护措施 workQueue排队也达到队列最大上线,就要提示无票等拒绝策略了,因为我们不能加车了,当前所有车次已经满负载
  • 上表来源于https://dayarch.top/p/why-we-need-to-use-threadpool.html

corePoolSize(线程池的基本大小)

  • 提交一个任务到线程池时,先判断当前运行的线程数量是否小于corePoolSize,若小于线程池会创建一个新的线程来执行任务

    • 注意:即使有空闲的基本线程能执行该任务,也会创建新的线程.
  • 如果线程池中的线程数已经大于或等于corePoolSize,则不会创建新的线程.

  • 如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程.

maximumPoolSize(线程池的最大数量)

线程池允许创建的最大线程数.

  • 阻塞队列已满,线程数小于maximumPoolSize便可以创建新的线程执行任务.
  • 如果使用无界的阻塞队列,该参数没有什么效果
  • 决定线程池最多可以创建的多少线程

workQueue(工作队列)

用于保存等待执行的任务的阻塞队列.

如果使用的阻塞队列为无界队列,则永远不会调用拒绝策略,因为再多的任务都可以放在队列中

ArrayBlockingQueue
  • 基于数组结构的有界阻塞队列,按FIFO(先进先出)原则对任务进行排序.
  • 使用该队列,线程池中能创建的最大线程数为maximumPoolSize.
  • 维护两个整形变量,标识队列头尾在数组中的位置,在生产者放入和消费者获取数据共用一个锁对象,意味着两者无法真正的并行运行,性能较低.
LinkedBlockingQueue
  • 基于链表结构的无界阻塞队列,按FIFO(先进先出)原则对任务进行排序,吞吐量高于ArrayBlockingQueue.
  • 静态工厂方法Executor.newFixedThreadPool()使用了这个队列.
  • 特别适合于生产者-消费者模型中的线程通信和任务调度等场景
  • LinkedBlockingQueueArrayBlockingQueue在插入删除节点性能方面更优,但是二者在put(), take()任务的时均需要加锁,SynchronousQueue使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是Transfer.transfer().
SynchronousQueue
  • 一个不存储元素的阻塞队列
  • 添加任务的操作必须等到另一个线程的移除操作,否则添加操作一直处于阻塞状态.静态工厂方法 Executor.newCachedThreadPool()使用了这个队列.
  • 适用于一些需要线程之间进行协作的场景
    • 线程池中的任务调度:线程池中的任务在执行时需要获取任务数据,如果任务数据需要由其他线程提供,则可以使用SynchronousQueue进行线程之间的数据交换.
    • 生产者-消费者模型:在一些需要在多个线程之间传递数据的生产者-消费者模型中,SynchronousQueue可以用来在生产者线程和消费者线程之间传递数据.
PriorityBlokingQueue
  • 一个支持优先级的无界阻塞队列.
  • 使用该队列,线程池中能创建的最大线程数为corePoolSize.
DelayedWorkQueue
  • 支持延时获取元素的无界阻塞队列
  • 创建元素时可以指定多久之后才能从队列中获取元素,常用于缓存系统或定时任务调度系统
LinkedTransferQueue
  • 一个由链表结构组成的无界阻塞队列,与LinkedBlockingQueue相比多了transfer和tryTranfer方法,该方法在有消费者等待接收元素时会立即将元素传递给消费者.
LinkedBlockingDeque
  • 一个由链表结构组成的双端阻塞队列,可以从队列的两端插入和删除元素.

keepAliveTime(线程活动保持时间)

  • 线程池的工作线程空闲后,保持存活的时间.如果任务多而且任务的执行时间比较短,可以调大keepAliveTime,提高线程的利用率.
  • 设置线程空闲时间,和空闲时间的单位,当线程闲置超过空闲时间就会被销毁。可以通过 allowCoreThreadTimeOut 方法来允许核心线程被回收。

unit(线程活动保持时间的单位)

  • 可选单位有DAYS、HOURS、MINUTES、毫秒、微秒、纳秒.

threadFactory(构建线程的工厂类)

  • 可以为线程创建时起个名字
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.holelin.sundry.demo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Executors;

@Slf4j
public class MyThreadFactory implements ThreadFactory {

private int counter;
private final String namePrefix;
private final ThreadGroup threadGroup;

public MyThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
SecurityManager s = System.getSecurityManager();
threadGroup = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
counter = 1;
}

public Thread newThread(Runnable r) {
Thread t = new Thread(threadGroup, r, namePrefix + "-thread-" + counter);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
counter++;
return t;
}

public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory("MyThreadFactory");
final ExecutorService service = Executors.newFixedThreadPool(10, threadFactory);
for (int i = 0; i < 10; i++) {
service.submit(() -> log.info("test"));

}
}
}

handler(饱和策略,或者又称拒绝策略)

当阻塞队列已满并且达到最大线程数时,再提交任务会走拒绝策略流程,JDK 提供了拒绝策略顶层接口 RejectedExecutionHandler,所有拒绝策略都需要继承该接口,JDK 内置了四种拒绝策略

AbortPolicy
  • 线程池默认的拒绝策略,触发时会抛出RejectedExecutionException 异常.
  • 如果是一些比较重要的业务,可以使用该拒绝策略,在系统不能进一步支持更大并发量的情况下通过抛出异常及时发现问题并进行处理.
CallerRunsPolicy
  • 在线程池没关闭的情况下,由调用者线程去处理任务,反之直接丢弃.
  • 此拒绝策略追求任务都能被执行,不丢失,比较适合并发量不大并且不允许丢失任务的场景场景,性能较低.
DiscardPolicy
  • 丢弃任务,不抛出异常,一般无感知.建议一些无关紧要的任务可以使用此策略.
DiscardOldestPolicy
  • 丢弃队列中最老的任务,然后重新提交被拒绝的任务.需要根据业务场景进行选择是否要用.

线程池大小的选择策略

1
CPU核数 = Runtime.getRuntime().availableProcessors()
  • CPU密集型

    • CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高.

    • 最佳线程数 = CPU核数 + 1

      《Java并发编程实战》这么说:

      计算(CPU)密集型的线程恰好在某时因为发生一个页错误或者因其他原因而暂停,刚好有一个“额外”的线程,可以确保在这种情况下CPU周期不会中断工作。

  • IO密集型

    • IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高
    • 最佳线程数 = (1/CPU利用率) = 1 + (I/O耗时/CPU耗时)
      • CPU利用率 = CPU耗时/(CPU耗时+IO耗时)

线程等待时间所占比例越高,需要越多线程;线程CPU时间所占比例越高,需要越少线程。

阿姆达尔定律

S=1(1p)+(pn)S=\frac{1}{(1-p)+(\frac{p}{n} )}

  • nn: CPU核心数
  • pp: 程序并行百分比
  • 1p1-p: 程序串行百分比
  • 假设$n\to \infty $,1p1-p​=5%,那S的极限就是20

Tips: 临界区都是串行的,非临界区都是并行的,用单线程执行临界区的时间/用单线程执行(临界区+非临界区)的时间就是串行百分比

向线程池提交任务

  • 使用ThreadPoolExecutor#execute()方法来提交任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ThreadPoolExecutorTest {
private static AtomicInteger threadId = new AtomicInteger(0);
private static final int CORE_POOL_SIZE = 5;
private static final int MAXIMUM_POOL_SIZE = 10;
private static final long KEEP_ALIVE_TIME = 10;

public static void main(String[] args) {
// 手动创建线程池
// 创建有界阻塞队列
ArrayBlockingQueue<Runnable> runnable = new ArrayBlockingQueue<Runnable>(10);
// 创建线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "working_thread_" + threadId.getAndIncrement());
return thread;
}
};

// 手动创建线程池
// 拒绝策略采用默认策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, runnable, threadFactory);

for (int i = 0; i < 20; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}

线程池的五种运行状态

ctl

  • ctlThreadPoolExecutor中的一个原子变量,用于表示线程池的状态和线程数量信息.

  • ctl的作用在于记录线程池的状态和线程数量,以支持线程池的动态管理.例如,当有新任务提交时,线程池会根据当前状态和线程数量来判断是否需要创建新的线程来执行任务;当线程池被关闭时,线程池会根据当前状态和线程数量来判断是否需要中断正在执行的任务.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 线程池状态
// runState is stored in the high-order bits
// RUNNING 高3位为111
private static final int RUNNING = -1 << COUNT_BITS;

// SHUTDOWN 高3位为000
private static final int SHUTDOWN = 0 << COUNT_BITS;

// 高3位 001
private static final int STOP = 1 << COUNT_BITS;

// 高3位 010
private static final int TIDYING = 2 << COUNT_BITS;

// 高3位 011
private static final int TERMINATED = 3 << COUNT_BITS;
  • RUNNING:该状态的线程池既能接受新提交的任务,又能处理阻塞队列中任务.
  • SHUTDOWN:该状态的线程池不能接收新提交的任务,但是能处理阻塞队列中的任务.(政府服务大厅不在允许群众拿号了,处理完手头的和排队的政务就下班.)
    • 处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态.
    • 注意:finalize() 方法在执行过程中也会隐式调用shutdown()方法.
  • STOP:该状态的线程池不接受新提交的任务,也不处理在阻塞队列中的任务,还会中断正在执行的任务.(政府服务大厅不再进行服务了,拿号、排队、以及手头工作都停止了.)
    • 在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
  • TIDYING:如果所有的任务都已终止,workerCount (有效线程数)=0 .
    • 线程池进入该状态后会调用 terminated() 钩子方法进入TERMINATED 状态.
  • TERMINATED:在terminated()钩子方法执行完后进入该状态,默认terminated()钩子方法中什么也没有做
状态名称 高3位的值 描述
RUNNING 111 接收新任务,同时处理任务队列中的任务
SHUTDOWN 000 不接受新任务,但是处理任务队列中的任务
STOP 001 中断正在执行的任务,同时抛弃阻塞队列中的任务
TIDYING 010 任务执行完毕,活动线程为0时,即将进入终结阶段
TERMINATED 011 终结状态
  • 线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示

    • 使用一个数来表示两个值的主要原因是:可以通过一次CAS同时更改两个属性的值
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 原子整数,前3位保存了线程池的状态,剩余位保存的是线程数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // 并不是所有平台的int都是32位.
    // 去掉前三位保存线程状态的位数,剩下的用于保存线程数量
    // 高3位为0,剩余位数全为1
    private static final int COUNT_BITS = Integer.SIZE - 3;

    // 2^COUNT_BITS次方,表示可以保存的最大线程数
    // CAPACITY 的高3位为 0
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  • 获取线程池状态、线程数量以及合并两个值的操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // Packing and unpacking ctl
    // 获取运行状态
    // 该操作会让除高3位以外的数全部变为0
    private static int runStateOf(int c) { return c & ~CAPACITY; }

    // 获取运行线程数
    // 该操作会让高3位为0
    private static int workerCountOf(int c) { return c & CAPACITY; }

    // 计算ctl新值
    private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池的关闭

  • 可以通过调用线程池的shutdown或者shutdownNow方法来关闭线程池:遍历线程池中工作线程,逐个调用interrupt方法来中断线程.

shutdown方法与shutdownNow的特点

  • shutdown方法将线程池的状态设置为SHUTDOWN状态,只会中断空闲的工作线程.
  • shutdownNow方法将线程池的状态设置为STOP状态,会中断所有工作线程,不管工作线程是否空闲.
  • 调用两者中任何一种方法,都会使isShutdown方法的返回值为true;线程池中所有的任务都关闭后,isTerminated方法的返回值为true.
  • 通常使用shutdown方法关闭线程池,如果不要求任务一定要执行完,则可以调用shutdownNow方法.

线程池创建方式

  • 使用ThreadPoolExecutor类创建线程池

  • 使用Executors

    1
    2
    3
    Executors.newWorkStealingPool();
    Executors.newCachedThreadPool();
    Executors.newScheduledThreadPool(3);
  • 使用ForkJoinPool类创建线程池

    1
    2
    3
    4
    5
    new ForkJoinPool();
    new ForkJoinPool(Runtime.getRuntime().availableProcessors());
    new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    null, true);
  • 使用ScheduledThreadPoolExecutor类创建线程池

Executors

newFixedThreadPool

  • 重用指定数目(nThreads)的线程,其背后使用的是无界的工作队列,任何时候最多有 nThreads 个工作线程是活动的.这意味着,如果任务数量超过了活动队列数目,将在工作队列中等待空闲线程出现;如果有工作线程退出,将会有新的工作线程被创建,以补足指定的数目 nThreads.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 特点:

    • 核心线程数==最大线程数(没有非核心线程),因此也无需超时时间;
    • 阻塞队列是无界的,可以放任意数量的任务;
  • 使用场景: 适用与任务量已知,相对耗时的任务;

  • 注意点:

    • 线程池里的线程数量不超过corePoolSize,这导致了maximumPoolSizekeepAliveTime将会是个无用参数

    • 由于使用了无界队列, 所以FixedThreadPool永远不会拒绝, 即饱和策略失效

      • 这意味着,即使有大量的任务提交到 FixedThreadPool,线程池仍然可以保持接受任务,并按照队列中任务的顺序逐个执行。因此,在使用无界队列时,饱和策略是不会生效的
  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    package com.holelin.sundry.test.thread;

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class FixedThreadPoolDemo {

    public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(2);

    executorService.execute(new Runnable() {
    public void run() {
    System.out.println(Thread.currentThread().getName() + " do plan a.");
    }
    });

    executorService.execute(new Runnable() {
    public void run() {
    System.out.println(Thread.currentThread().getName() + " do plan b.");
    }
    });

    executorService.execute(new Runnable() {
    public void run() {
    System.out.println(Thread.currentThread().getName() + " do plan c.");
    }
    });

    executorService.shutdown();
    }
    }
    // output info
    pool-1-thread-2 do plan b.
    pool-1-thread-1 do plan a.
    pool-1-thread-2 do plan c.

newCachedThreadPool

  • 是一种用来处理大量短时间工作任务的线程池,具有几个鲜明特点:它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过 60 秒,则被终止并移出缓存;长时间闲置时,这种线程池,不会消耗什么资源.其内部使用 SynchronousQueue 作为工作队列.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • 使用场景

    • 适用于要求低延迟的短期任务场景
  • 特点:

    • 核心线程数为0,最大线程数为Integer.MAX_VALUE,非核心线程的生存时间为60s,意味着全都是非核心线程(60s后可以回收)
    • 非核心线程可以无限创建;
    • 队列采用SynchronousQueue实现,它没有容量,没有线程来取是放不进去的(一手交钱,一手交货);

newSingleThreadExecutor

  • 它的特点在于工作线程数目被限制为 1,操作一个无界的工作队列,所以它保证了所有任务的都是被顺序执行,最多会有一个任务处于活动状态,并且不允许使用者改动线程池实例,因此可以避免其改变线程数目.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  • 使用场景

    • 希望多个任务排队执行,线程数固定为1,任务数多于1时,会放入无界队列中排队,任务执行完毕,这个唯一的线程也不会被是否;
    • 适用于需要异步执行但需要保证任务顺序的场景
  • 区别:

    • 自己创建一个单线程串行执行任务,如果任务执行失败而终止没有任何补救措施,而线程池还会新建一个线程,保证线程池的正常工作;
    • ExecutorService newSingleThreadExecutor()线程个数为1,不能修改;
      • FinalizableDelegatedExecutorService 应用的是装饰器模式模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中特有的方法;
    • ExecutorService newFixedThreadPool(1)初始时为1,以后还可以修改;
      • 对外暴露的是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改
  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    package com.holelin.sundry.test.thread;

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class SingleThreadDemo {

    public static void main(String[] args) {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    executorService.execute(new Runnable() {
    public void run() {
    System.out.println(Thread.currentThread().getName() + " do plan a.");
    }
    });

    executorService.execute(new Runnable() {
    public void run() {
    System.out.println(Thread.currentThread().getName() + " do plan b.");
    }
    });

    executorService.execute(new Runnable() {
    public void run() {
    System.out.println(Thread.currentThread().getName() + " do plan c.");
    }
    });

    executorService.shutdown();
    }
    }
    // output info
    pool-1-thread-1 do plan a.
    pool-1-thread-1 do plan b.
    pool-1-thread-1 do plan c.

newScheduledThreadPool newSingleThreadScheduledExecutor

  • newSingleThreadScheduledExecutor()newScheduledThreadPool(int corePoolSize),创建的是个 ScheduledExecutorService,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
1
2
3
4
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    package com.holelin.sundry.test.thread;

    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;

    public class ScheduledThreadPoolDemo {

    public static long fixedRateInterval = 0;
    public static long lastFixedRateRunTime = System.nanoTime();

    public static long withFixedInterval = 0;
    public static long lastWithFixedRunTime = System.nanoTime();

    public static void main(String[] args) {
    ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

    //1秒后执行,只执行1次
    executorService.schedule(new Runnable() {
    public void run() {
    System.out.println(Thread.currentThread().getName() + " do plan a.");
    }
    }, 1, TimeUnit.SECONDS);

    //1秒后开始执行,每间隔3秒执行一次,这个间隔时间是从上一个任务【执行开始时间】算起
    lastFixedRateRunTime = System.nanoTime();
    executorService.scheduleAtFixedRate(new Runnable() {
    public void run() {
    long runTime = System.nanoTime();
    fixedRateInterval = (runTime - lastFixedRateRunTime) / 1_000_000_000;
    lastFixedRateRunTime = runTime;

    //模拟任务执行
    //这个休眠时间不影响下次执行间隔时间的计算,执行间隔是3秒(当任务执行耗时小于3秒时,如果大于3秒了,则间隔为任务的执行耗时)
    sleep(2);
    System.out.println(
    Thread.currentThread().getName() + " do plan b. Interval:" + fixedRateInterval + "s");
    }
    }, 1, 3, TimeUnit.SECONDS);

    //两秒后执行任务,每次任务之间间隔3秒,这个间隔时间是从上一个任务【执行结束时间】算起
    lastWithFixedRunTime = System.nanoTime();
    executorService.scheduleWithFixedDelay(new Runnable() {
    public void run() {
    long runTime = System.nanoTime();
    withFixedInterval = (runTime - lastWithFixedRunTime) / 1_000_000_000;
    lastWithFixedRunTime = runTime;

    //模拟任务执行
    //这个休眠时间会影响下次执行间隔时间的计算,执行间隔是2秒加上本次运行时间
    sleep(2);
    System.out.println(
    Thread.currentThread().getName() + " do plan c. Interval:" + withFixedInterval + "s");
    }
    }, 2, 3, TimeUnit.SECONDS);


    }

    public static void sleep(long millis) {
    try {
    TimeUnit.SECONDS.sleep(millis);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    // output info
    pool-1-thread-1 do plan a.

    pool-1-thread-2 do plan b. Interval:1s

    pool-1-thread-3 do plan c. Interval:2s

    pool-1-thread-2 do plan b. Interval:3s
    pool-1-thread-2 do plan b. Interval:2s

    pool-1-thread-1 do plan c. Interval:5s

    pool-1-thread-2 do plan b. Interval:2s

    pool-1-thread-3 do plan c. Interval:5s

    pool-1-thread-1 do plan b. Interval:3s
    pool-1-thread-1 do plan b. Interval:2s

    pool-1-thread-2 do plan c. Interval:5s

    pool-1-thread-1 do plan b. Interval:3s

newWorkStealingPool

  • 其内部会构建ForkJoinPool,利用Work-Stealing算法,并行地处理任务,不保证处理顺序.
1
2
3
4
5
6
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

Executors返回的线程池对象的弊端

  • FixedThreadPoolSingleThreadPool: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM.
  • CachedThreadPool: 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM.

总结

参数 FixedThreadPool CachedThreadPool ScheduledThreadPool SingleThreadExcutor SingleThreadScheduledExecutor
corePoolSize 构造函数传入 0 构造函数传入 1 1
maxPoolSize 构造函数传入 Integer.MAX_VALUE Integer.MAX_VALUE 1 Integer.MAX_VALUE
keepAliveTime 0 60秒 0 0 0
workQueue LinkedBlockingQueue SynchronousQueue DelayedWorkQueue LinkedBlockingQueue DelayedWorkQueue

在使用线程池的过程中遇到过哪些坑或者需要注意的地方

  • OOM 问题.刚开始使用线程都是通过 Executors 创建的

  • 任务执行异常丢失问题.可以通过下述4种方式解决

    1. 在任务代码中增加 try、catch 异常处理
    2. 如果使用的 Future 方式,则可通过 Future 对象的 get 方法接收抛出的异常
    3. 为工作线程设置 setUncaughtExceptionHandler,在 uncaughtException 方法中处理异常
    4. 可以重写 afterExecute(Runnable r, Throwable t) 方法,拿到异常 t
  • 共享线程池问题.整个服务共享一个全局线程池,导致任务相互影响,耗时长的任务占满资源,短耗时任务得不到执行.同时父子线程间会导致死锁的发生,进而导致 OOM.

  • 跟 ThreadLocal 配合使用,导致脏数据问题.我们知道 Tomcat 利用线程池来处理收到的请求,会复用线程,如果我们代码中用到了 ThreadLocal,在请求处理完后没有去 remove,那每个请求就有可能获取到之前请求遗留的脏值.

  • ThreadLocal 在线程池场景下会失效,ThreadLocal 在线程池场景下会有一些问题.线程池中的线程是可以被重用的,当一个线程重新被分配一个任务时,如果之前使用的 ThreadLocal 对象没有被清空,就可能导致数据出现污染.可以考虑用阿里开源的 Ttl 来解决.

  • 需要自定义线程工厂指定线程名称,不然发生问题都不知道咋定位.

Spring中的线程池

  • Spring 通过任务执行器(TaskExecutor)来实现多线程和并发编程,使用ThreadPoolTaskExecutor实现一个基于线程池的TaskExecutor,还得需要使用@EnableAsync开启异步,并通过在需要的异步方法那里使用注解@Async声明是一个异步任务

  • Spring 已经实现的异常线程池:

    • SimpleAsyncTaskExecutor: 不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程.
    • SyncTaskExecutor: 这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方.
    • ConcurrentTaskExecutor: Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类.
    • SimpleThreadPoolTaskExecutor: 是QuartzSimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类.
    • ThreadPoolTaskExecutor : 最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装.
  • org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
    implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

    private final Object poolSizeMonitor = new Object();

    private int corePoolSize = 1;

    private int maxPoolSize = Integer.MAX_VALUE;

    private int keepAliveSeconds = 60;

    private int queueCapacity = Integer.MAX_VALUE;

    private boolean allowCoreThreadTimeOut = false;

    @Nullable
    private TaskDecorator taskDecorator;

    @Nullable
    private ThreadPoolExecutor threadPoolExecutor;

    // Runnable decorator to user-level FutureTask, if different
    private final Map<Runnable, Object> decoratedTaskMap =
    new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
    //...略
    }
  • 配置类中方法说明:

    • Spring 中的ThreadPoolExecutor是借助JDK并发包中的java.util.concurrent.ThreadPoolExecutor来实现的。其中一些值的含义如下:

    • int corePoolSize:线程池维护线程的最小数量

    • int maximumPoolSize:线程池维护线程的最大数量,线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。

    • long keepAliveTime:空闲线程的存活时间TimeUnit

    • unit:时间单位,有纳秒,微秒,毫秒,秒

    • BlockingQueue workQueue:持有等待执行的任务队列,一个阻塞队列,用来存储等待执行的任务,当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待

    • RejectedExecutionHandler handler 线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。

    • 当任务添加到线程池中之所以被拒绝,可能是由于: 第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。

      • Reject策略预定义有四种:
        • ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException
        • ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.
        • ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃.
        • ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如
          果再次失败,则重复此过程)
        • 自定义策略: 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务
  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @Bean
    public ThreadPoolTaskExecutor executor(){
    final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(50);
    executor.setKeepAliveSeconds(5 * 60);
    executor.setQueueCapacity(1000);
    // 自定义实现拒绝策略
    executor.setRejectedExecutionHandler((Runnable runnable, ThreadPoolExecutor exe) -> log.error("当前任务线程池队列已满."));
    // 线程名称前缀
    executor.setThreadNamePrefix("Business-");
    executor.initialize();
    // 告诉线程池,在销毁之前执行shutdown方法
    executor.setWaitForTasksToCompleteOnShutdown(true);
    // shutdown\shutdownNow 之后等待3秒
    executor.setAwaitTerminationSeconds(3);
    return executor;
    }