参考文献

并发工具类

  • 提供了比synchronized更加高级的各种同步结构,包括CountDownLatchCyclicBarrierSemaphore等,可以实现更加丰富的多线程操作,比如利用Semaphore作为资源控制器,限制同时进行工作的线程数量.
  • 各种线程安全的容器,比如最常见的ConcurrentHashMap、有序的ConcunrrentSkipListMap,或者通过类似快照机制,实现线程安全的动态数组CopyOnWriteArrayList等.
  • 各种并发队列实现,如各种BlockedQueue实现,比较典型的ArrayBlockingQueueSynchorousQueue或针对特定场景的PriorityBlockingQueue等.
  • 强大的Executor框架,可以创建各种不同类型的线程池,调度任务运行等,绝大部分情况下,不再需要自己从头实现线程池和任务调度器.
img

CountDownLatch(等待多个线程完成)

  • CountDownLatch允许一个或多个线程等待,直到其他线程都操作完成.一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成.
  • 倒数计数器,当计数器>0,所有线程都会等待,进入阻塞;当计数器为0,则释放所有被阻塞的线程.
  • CountDownLatch用给定的计数初始化.由于调用了countDown()方法,await()方法会阻塞直到当前计数为零,之后所有等待的线程都会被释放,任何后续的await()调用都会立即返回.这是一种一次性现象——计数无法重置.如果需要重置计数的版本,可以考虑使用CyclicBarrier
使用场景
  • CountDownLatch:例如应用场景是某个线程在开始运行前需要等待多个线程都做完各自的任务之后才能运行.

    • 用法一: 一个线程等待其他多个线程都执行完毕,再继续自己的工作

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      public static void main(String[] args) throws InterruptedException {
      CountDownLatch countDownLatch = new CountDownLatch(5);
      ExecutorService executorService = Executors.newFixedThreadPool(5);
      for (int i = 0; i < 5; i++) {
      final int no = i + 1;
      Runnable runnable = new Runnable() {
      @Override
      public void run() {

      try {
      Thread.sleep((long) Math.random() * 10000);
      System.out.println(no + "号运动员完成了比赛");

      } catch (InterruptedException e) {
      e.printStackTrace();
      } finally {
      countDownLatch.countDown();
      }
      }
      };
      executorService.submit(runnable);
      }
      System.out.println("等待5个运动员都跑完了....");
      countDownLatch.await();
      System.out.println("所有人都跑完了,比赛结束");
      executorService.shutdown();
      }
      1
      2
      3
      4
      5
      6
      7
      等待5个运动员都跑完了....
      3号运动员完成了比赛
      4号运动员完成了比赛
      1号运动员完成了比赛
      2号运动员完成了比赛
      5号运动员完成了比赛
      所有人都跑完了,比赛结束
    • 用法二: 多个线程等待某一个线程的信号,同时开始执行

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      public static void main(String[] args) throws InterruptedException {
      System.out.println("运动员有5秒的准备时间");
      CountDownLatch countDownLatch = new CountDownLatch(1);
      ExecutorService executorService = Executors.newFixedThreadPool(5);
      for (int i = 0; i < 5; i++) {
      final int no = i + 1;
      Runnable runnable = new Runnable() {
      @Override
      public void run() {
      System.out.println(no + "号运动员准备完毕,等待裁判员的发令枪");
      try {
      countDownLatch.await();
      System.out.println(no + "号运动员开始跑步了");
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      };
      executorService.submit(runnable);
      }
      Thread.sleep(5000);
      System.out.println("5秒这边时间已过,发令枪响,比赛开始!");
      countDownLatch.countDown();
      executorService.shutdown();
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      运动员有5秒的准备时间
      1号运动员准备完毕,等待裁判员的发令枪
      4号运动员准备完毕,等待裁判员的发令枪
      5号运动员准备完毕,等待裁判员的发令枪
      2号运动员准备完毕,等待裁判员的发令枪
      3号运动员准备完毕,等待裁判员的发令枪
      5秒这边时间已过,发令枪响,比赛开始!
      4号运动员开始跑步了
      2号运动员开始跑步了
      5号运动员开始跑步了
      1号运动员开始跑步了
      3号运动员开始跑步了
  • 当使用计数器为1的CountDownLatch,可以做为简单的开关来使用,如可以作为启动信号来使用.

  • 当使用计数器为N的CountDownLatch,可以用来让一个线程等待,直到N个线程完成某个操作,或者某个操作完成N次.如将一个大任务拆分为N个小任务,等所有小任务都执行完成后,进行汇总操作.

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.holelin.sundry.test.thread;

import cn.hutool.core.thread.ThreadUtil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2022/10/9 10:16
* @UpdateUser: HoleLin
* @UpdateDate: 2022/10/9 10:16
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class CountDownLatchTest {

@Test
public void testCountDownLatchByOne() throws InterruptedException {
final int N = 5;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
final ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < N; i++) {
executorService.execute(new TestOneWorker(startSignal, doneSignal));
}
doPrepareSomething();
startSignal.countDown();
doneSignal.await();
System.out.println("all work finish ");
}

class TestOneWorker implements Runnable {

private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;

public TestOneWorker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}

@Override
public void run() {
try {
startSignal.await();
doWorker();
doneSignal.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

void doWorker() {
System.out.println("do work");
ThreadUtil.sleep(2, TimeUnit.SECONDS);
System.out.println("work done");
}
}


@Test
public void testCountDownLatchByN() throws InterruptedException {
final int N = 5;
CountDownLatch doneSignal = new CountDownLatch(N);
final ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < N; i++) {
executorService.execute(new TestNWorker(doneSignal, i));
}
doneSignal.await();
System.out.println("all work finish");
}

class TestNWorker implements Runnable {

private final CountDownLatch doneSignal;
private final int i;

public TestNWorker(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}

@Override
public void run() {
doWorker(i);
doneSignal.countDown();
}

void doWorker(int i) {
System.out.println(i + " wil be do work");
ThreadUtil.sleep(2, TimeUnit.SECONDS);
System.out.println(i + " work done");
}
}

void doPrepareSomething() {
System.out.println("do prepare something");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// test one output info
do prepare something
do work
do work
do work
do work
do work
work done
work done
work done
work done
work done
all work finish

// test N output info
3 wil be do work
4 wil be do work
0 wil be do work
2 wil be do work
1 wil be do work
4 work done
0 work done
2 work done
3 work done
1 work done
all work finish

CyclicBarrier(同步屏障)

  • 一种同步辅助,它允许一组线程互相等待到达一个公共障碍点.Cyclicbarrier在包含固定数量线程的程序中非常有用,这些线程必须偶尔相互等待.这个屏障被称为循环的,因为它可以在等待线程被释放后被重用.
  • CyclicBarrier支持可选的Runnable命令,该命令在每个barrier点上运行一次,在队列中的最后一个线程到达之后,但在任何线程被释放之前.此屏障操作用于在任何一方继续之前更新共享状态.
  • CyclicBarrier对失败的同步尝试使用全有或全无中断模型:如果线程由于中断、故障或超时而过早离开屏障点,则在该屏障点等待的所有其他线程也会通过 BrokenBarrierException(或 InterruptedException如果他们也几乎同时被打断).
  • 内存一致性效果:在调用 await()之前线程中的操作发生在作为屏障操作的一部分的操作之前,而这些操作又在从其他线程中的相应 await() 成功返回之后发生的操作.
1
2
3
4
5
6
7
8
9
10
11
// CyclicBarrier的构造函数有2个参数,第1个参数是声明有几个部件需要制造(有几件事可以并行),第2个参数是一个Runnable,此参数代表并行完之后要做的最后一件事情.
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
this(parties, null);
}
使用场景
  • CyclicBarrier:例如应用场景是多个线程互相等待至某个状态后再同时执行各自的任务. 可用于多线程计算数据,最后合并计算结果.
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.holelin.sundry.test.thread;

import cn.hutool.core.thread.ThreadUtil;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2022/10/9 11:53
* @UpdateUser: HoleLin
* @UpdateDate: 2022/10/9 11:53
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class CyclicBarrierTest {


public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new AssembleComputerWorkshop());
final ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new CpuWorkshop(cyclicBarrier));
executorService.execute(new MainboardWorkshop(cyclicBarrier));
executorService.execute(new MemoryWorkshop(cyclicBarrier));
}
}

interface Producible {

void produce();
}

class CpuWorkshop implements Runnable, Producible {

private CyclicBarrier cyclicBarrier;

public CpuWorkshop(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
while (true) {
try {
System.out.println("start produce CPU");
ThreadUtil.sleep(2, TimeUnit.SECONDS);
produce();
this.cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}

@Override
public void produce() {
System.out.println("produce CPU");
}
}

class MainboardWorkshop implements Runnable, Producible {

private CyclicBarrier cyclicBarrier;

public MainboardWorkshop(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void produce() {
System.out.println("produce Main board");

}

@Override
public void run() {
while (true) {
try {
System.out.println("start produce Main board");
ThreadUtil.sleep(2, TimeUnit.SECONDS);
produce();
this.cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
}

class MemoryWorkshop implements Runnable, Producible {

private CyclicBarrier cyclicBarrier;

public MemoryWorkshop(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void produce() {
System.out.println("produce memory");

}

@Override
public void run() {
while (true) {
try {
System.out.println("start produce memory");
ThreadUtil.sleep(2, TimeUnit.SECONDS);
produce();
this.cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
}

class AssembleComputerWorkshop implements Runnable {

@Override
public void run() {
System.out.println("start Assemble Computer");
ThreadUtil.sleep(2, TimeUnit.SECONDS);

}
}
CyclicBarrierCountDownLatch的区别
  • 作用对象不同: CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch只需等待数字倒数到 0,也就是说 CountDownLatch作用于事件,但 CyclicBarrier作用于线程;CountDownLatch是在调用了 countDown 方法之后把数字倒数减 1,而 CyclicBarrier是在某线程开始等待后把计数减 1.
  • 可重用性不同: CountDownLatch在倒数到 0 并且触发门闩打开后,就不能再次使用了,除非新建一个新的实例;而 CyclicBarrier可以重复使用.CyclicBarrier还可以随时调用 reset 方法进行重置,如果重置时有线程已经调用了 await 方法并开始等待,那么这些线程则会抛出 BrokenBarrierException 异常.
  • 执行动作不同: CyclicBarrier有执行动作 barrierAction,而 CountDownLatch没这个功能.

Semaphore(控制并发线程数)

  • Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源.
使用场景
  • 流量控制,特别是公用资源有限的应用场景.控制一次多少线程可以进行操作.常用于池化技术中控制并发.
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.holelin.sundry.test.thread;

import cn.hutool.core.thread.ThreadUtil;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
* @Description:
* @Author: HoleLin
* @CreateDate: 2022/10/9 13:55
* @UpdateUser: HoleLin
* @UpdateDate: 2022/10/9 13:55
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class SemaphoreTest {

public static void main(String[] args) {
final int N = 30;
ExecutorService executor = Executors.newFixedThreadPool(N);
// 控制10个线程一批
Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < N; i++) {
executor.execute(() -> {
try {
semaphore.acquire();
System.out.println("处理数据中......");
ThreadUtil.sleep(3, TimeUnit.SECONDS);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}

});
}
executor.shutdown();
}
}

Exchanger(线程间交换数据)

  • Exchanger(交换者)是一个用于线程间协作的工具类.

  • Exchanger用于进行线程间的数据交换.它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据.这两个线程通过exchange()方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange()方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.holelin.sundry.test.thread;

import java.util.concurrent.Exchanger;

public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
try {
String data1 = "Thread1 data";
String data2 = exchanger.exchange(data1);
System.out.println("Thread1 received: " + data2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try {
String data2 = "Thread2 data";
String data1 = exchanger.exchange(data2);
System.out.println("Thread2 received: " + data1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
// output
Thread2 received: Thread1 data
Thread1 received: Thread2 data

Phaser

  • Phaser是Java SE 7中引入的一种线程同步机制,可以用于多个线程之间协调和同步.Phaser的作用类似于CyclicBarrierCountDownLatch,但是它更加灵活和强大,可以支持更复杂的线程同步场景.
  • Phaser可以将多个线程分为多个阶段,每个阶段都可以定义一个任务,在所有线程完成当前阶段的任务后,Phaser会自动进入下一个阶段.这样,可以实现多个线程之间的分阶段同步和协调,大大简化了复杂多线程程序的开发和维护.
  • Phaser的主要作用如下:
    1. 分阶段执行任务: Phaser可以将多个线程分为多个阶段,并在每个阶段执行相应的任务.每个阶段的任务可以是相同的,也可以是不同的.当所有线程完成当前阶段的任务后,Phaser会自动进入下一个阶段.
    2. 动态增加和删除参与者: Phaser允许动态增加和删除参与者,这意味着可以在程序运行过程中动态地调整线程的数量.这种灵活性可以帮助我们更好地应对多线程程序中的变化和不确定性.
    3. 等待其他线程执行完毕: Phaser可以让线程等待其他线程执行完毕后再继续执行.这个功能类似于CountDownLatch,但是更加灵活和强大,可以支持更复杂的同步场景.
    4. 实现多线程并发控制: Phaser可以帮助我们实现多线程并发控制,比如限制并发线程的数量、协调多个线程的执行顺序等.

StampedLock