参考文献

  • <<Java并发编程艺术>>
  • <<深入理解Java虚拟机-JVM高级特性与最佳实践>>

线程

img

线程的创建和运行

  • 直接使用Thread

    1
    Thread t= new Thread(()->{}).start();
  • Runnable配合Thread

    1
    2
    3
    Runnable runnable = () -> log.debug("hellow");
    Thread t = new Thread(runnable);
    t.start;
  • 实现Callable接口通过FutureTask包装器来创建Thread线程;

    1
    2
    3
    4
    5
    6
    7
    8
    // FutureTask能接收Callable类型的参数,用来处理返回结果
    FutureTask<String> task = new FutureTask<>(()->{
    log.debug("hello");
    return "demo";
    })
    new Thread(task,"t").start();
    // 主线程阻塞,同步等待task执行完毕的结果
    String result = task.get();
  • 使用ExecutorServiceCallableFuture实现有返回结果的多线程(也就是使用了ExecutorService来管理前面的三种方式)

创建一个线程的过程

  1. 它为一个线程栈分配内存,该栈为每个线程方法调用保存一个栈帧
  2. 每一栈帧由一个局部变量数组、返回值、操作数堆栈和常量池组成
  3. 一些支持本机方法的JVM也会分配一个本机堆栈
  4. 每个线程获得一个程序计数器,告诉它当前处理器执行的指令是什么
  5. 系统创建一个与Java线程对应的本机线程
  6. 将与线程相关的描述符添加到JVM内部数据结构中
  7. 线程共享堆和方法区域

线程状态

img

  • 上图来自于https://dayarch.top/p/java-thread-life-cycle.html

线程的六种状态

线程状态 说明
初始状态(NEW) 仅是在语言层面上创建了线程对象,还未与操作系统关联;新创建了一个线程对象.
运行(RUNNABLE) Java线程中将就绪(READY)和运行中(RUNNING)两种状态笼统的称为“运行”.
线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法.该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(READY)
就绪状态的线程在获得CPU时间片后变为运行中状态(RUNNING).
阻塞状态(BLOCKED) 如果调用阻塞API,此时该线程实际上不会使用CPU会导致线程切换上下文进入阻塞状态,等BIO操作完毕后会由操作系统唤醒阻塞线程,状态由阻塞状态转变为就绪状态;
与就绪状态的区别是,对于阻塞状态的线程来说,只要它们一直不被唤醒,CPU就一直不会考虑调度它们;
等待(WAITING) 进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)
超时等待(TIMED_WAITING) 该状态不同于WAITING,它可以在指定的时间后自行返回.
终止状态(TERMINATED) 表示线程已经执行完毕,生命周期已经结束,不会转变为其他状态了.

线程状态转换

img

img

img

  • 上图来自于https://dayarch.top/p/java-thread-life-cycle.html
  • 注: 上图有个错误进入WAITING状态时,Object.join(),应该是Thread.join()

线程中断

  • Java线程的打断标记是通过一个boolean类型的标志位来实现的,默认情况下该标志位的初始值为false,表示线程未被打断.

  • 中断是指一个线程(通常是主线程)通过interrupt()方法向另一个线程(被中断线程)发出请求,要求另一个线程立即停止正在执行的工作,转而去做其他事情.被中断线程可以随时判断是否被中断,并做出相应的处理.

    • Java提供了一种用于停止线程的协商机制——中断,即中断标识协商机制.中断只是一种协商机制,有其他线程或者自己调用线程的interrupt方法.希望被调用的线程中断,该方法仅仅将线程对象中的中标标识设置为true
  • 线程通过检查自身是否被中断来进行相应,线程通过方法isInterrupted()来进行判断是否被中断,也可以调用静态方法Thread.interrupted()方法返回当前线程中断标记位并对当前线程中断标记位来进行复位(true-->false).

    • The interruption status of any Thread can be inspected using method isInterrupted.
      This method returns true if the thread has been interrupted via the interrupt method
      but the status has not since been reset either by the thread invoking
      Thread.interrupted (see § 1.1.2.5) or in the course of wait, sleep, or join
      throwing InterruptedException

      – Concurrent Programming in Java™: Design Principles and Patterns, Second Edition

    • 如果线程通过interrupt方法被中断但该状态尚未被线程通过Thread.interrupted或在等待、休眠或加入过程中抛出InterruptedException的情况下重置,则该方法返回true.

  • 中断一个线程,其本意是给这个线程一个通知信号,会影响这个线程内部的一个中断标识位.这个线程本身并不会因此而改变状态(如阻塞,终止等).

    1. 调用interrupt()方法并不会中断一个正在运行的线程.也就是说处于Running状态的线程并不会因为被中断而被终止,仅仅改变了内部维护的中断标识位而已.
    2. 若调用sleep()而使线程处于TIMED-WATING 状态,这时调用interrupt()方法,会抛出
      InterruptedException,从而使线程提前结束TIMED-WATING状态
    3. Java的API中可以看到,许多声明抛出InterruptedException的方法,这些方法在抛出InterruptedException之前,Java虚拟机会先将该线程的中断标识位清除,然后抛出InterruptedException,此时调用isInterrupted()方法将会返回false
    4. 中断状态是线程固有的一个标识位,可以通过此标识位安全的终止线程.比如 ,想终止一个线程thread 的时候,可以调用thread.interrupt()方法,在线程的run方法内部可以根据thread.isInterrupted()的值来优雅的终止线程

使用中断机制两个原则

原则-1

如果遇到的是可中断的阻塞方法, 并抛出InterruptedException,可以继续向方法调用栈的上层抛出该异常;如果检测到中断,则可清除中断状态并抛出InterruptedException,使当前方法也成为一个可中断的方法

原则-2

若有时候不太方便在方法上抛出InterruptedException,比如要实现的某个接口中的方法签名上没有 throws InterruptedException,这时就可以捕获可中断方法的 InterruptedException 并通过 Thread.currentThread.interrupt()来重新设置中断状态.

  • 总的来说,我们应该留意 InterruptedException,当我们捕获到该异常时,绝不可以默默的吞掉它,什么也不做,因为这会导致上层调用栈什么信息也获取不到.其实在编写程序时,捕获的任何受检异常我们都不应该吞掉

终止线程4种方式

  • 正常运行结束: 程序运行结束,线程自动结束.

  • 使用退出标志退出线程. 一般 run()方法执行完,线程就会正常结束,然而,常常有些线程是伺服线程.它们需要长时间的运行,只有在外部某些条件满足的情况下,才能关闭这些线程.使用一个变量来控制循环,例如:
    最直接的方法就是设一个 boolean 类型的标志,并通过设置这个标志为 true 或 false 来控制 while循环是否退出,代码示例:

    1
    2
    3
    4
    5
    6
    7
    8
    public class ThreadSafe extends Thread {
    public volatile boolean exit = false;
    public void run() {
    while (!exit){
    //do something
    }
    }
    }
  • interrupt方法结束线程. 使用interrupt()方法来中断线程有两种情况:

    • 线程处于阻塞状态:如使用了sleep,同步锁的 wait,socket 中的receiver,accept 等方法时, 会使线程处于阻塞状态.当调用线程的 interrupt()方法时,会抛出InterruptException 异常. 阻塞中的那个方法抛出这个异常,通过代码捕获该异常,然后 break 跳出循环状态,从而让我们有机会结束这个线程的执行.通常很多人认为只要调用interrupt() 方法线程就会结束,实际上是错的, 一定要先捕获 InterruptedException 异常之后通过break来跳出循环,才能正常结束run方法.

    • 线程未处于阻塞状态:使用 isInterrupted()判断线程的中断标志来退出循环.当使用interrupt()方法时,中断标志就会置true,和使用自定义的标志来控制循环是一样的道理.

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      public class ThreadSafe implements Runnable {
      @Override
      public void run() {
      while (!!Thread.currentThread().isInterrupted()){ //非阻塞过程中通过判断中断标志来退出
      try{
      Thread.sleep(5*1000);//阻塞过程捕获中断异常来退出
      }catch(InterruptedException e){
      e.printStackTrace();
      break;//捕获到异常之后,执行 break 跳出循环
      }
      }
      }
      }
  • stop方法终止线程(线程不安全)

线程之间的协作

Thread.join()方法

  • 如果一个线程A执行了thread.join()语句,其含义为:当前线程A等待thread线程终止之后才从thread.join()返回.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
    /**
* Waits at most {@code millis} milliseconds for this thread to
* die. A timeout of {@code 0} means to wait forever.
*
* <p> This implementation uses a loop of {@code this.wait} calls
* conditioned on {@code this.isAlive}. As a thread terminates the
* {@code this.notifyAll} method is invoked. It is recommended that
* applications not use {@code wait}, {@code notify}, or
* {@code notifyAll} on {@code Thread} instances.
*
* @param millis
* the time to wait in milliseconds
*
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
// 条件不满足,继续等待
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
````

#### `Object.wait()/Object.notify()/Object.notifyAll()`

* 调用`wait()`使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用`notify()` 或者`notifyAll() `来唤醒挂起的线程.
* 它们都属于`Object`的一部分,而不属于`Thread`.
* 只能用在同步方法或者同步控制块中使用,否则会在运行时抛出`IllegalMonitorStateExeception`.
* 使用 `wait()` 挂起期间,线程会释放锁.这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 `notify()` 或者 `notifyAll()` 来唤醒挂起的线程,造成死锁.

##### 什么时候可以使用`notify()`

1. 所有等待线程拥有相同等待条件;
2. 所有等待线程被唤醒后,执行相同操作;
3. 只需要唤醒一个线程;
4. **`notify()`的典型的应用就是线程池**

#### `wait()` 和 `sleep()`的异同

##### 相同点

* 它们都可以让线程阻塞.
* 它们都可以响应 `interrupt` 中断:在等待的过程中如果收到中断信号,都可以进行响应,并抛出 `InterruptedException` 异常.

##### 异同点

- `wait()` 是 `Object` 的方法,而 `sleep()` 是 `Thread` 的静态方法;
- `sleep`方法在持有锁时执行完毕后不会释放锁资源,`wait`在执行完毕后会释放锁资源
- 调用`wait()`需要先获取锁,`sleep`不需要
- `sleep`执行后线程所处状态为`TIMED_WATING`,会自动被唤醒,`wait`执行后线程所处状态为`WAITING`,需要手动唤醒

#### `Condition.await()/Condition.signal()/Condition.signalAll()`

* `java.util.concurrent`类库中提供了`Condition`类来实现线程之间的协调,可以在 `Condition` 上调用 `await()` 方法使线程等待,其它线程调用` signal() `或 `signalAll()` 方法唤醒等待的线程.相比于 `wait()` 这种等待方式,`await()`可以指定等待的条件,因此更加灵活.

#### 两阶段终止模式(正确的终止线程的方式)

* 利用`isInterrupted()`判断是否被打断, `interrupt()`可以打断正在执行的线程,无论整个线程是在`sleep`,`wait`还是正常运行

* `Thread.interrupt()`,通知线程中断;

* 线程内逻辑需配合响应中断:
* 正常执行循环中使用`Thread.currentThread().isInterrupted()`判断中断标识;

* 若含有`sleep()`等`Waiting`操作,会唤醒线程,抛出`InterruptedException`,抛出后中断标识会重置.对于中断异常,要么正确处理,重新设置中断标识;要么在方法上声明抛出异常以便调用方处理


```java
while (!Thread.currentThread().isInterrupted() && more work to do) {
    do more work
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
public void testTwoPhaseTermination() {
Thread t = new Thread(() -> {
while (true) {
Thread currentThread = Thread.currentThread();
if (currentThread.isInterrupted()) {
log.info("料理后事");
break;
}
try {
Thread.sleep(1000);
log.info("保存结果");
} catch (InterruptedException e) {
currentThread.interrupt();
}
}
});
t.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("准备终止线程");
t.interrupt();
}
  • 利用停止标记

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    // 停止标记用volatile是为了保证该变量在多个线程之间的可见性
    // 例子中即主线程把它修改为true对t线程可见
    private volatile boolean isStop = false;

    @Test
    public void testTwoPhaseTermination2() {
    Thread t = new Thread(() -> {
    while (true) {
    Thread currentThread = Thread.currentThread();
    if (isStop) {
    log.info("料理后事");
    break;
    }
    try {
    Thread.sleep(1000);
    log.info("保存结果");
    } catch (InterruptedException e) {
    currentThread.interrupt();
    }
    }
    });
    t.start();
    try {
    Thread.sleep(3000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.info("准备终止线程");
    isStop = true;
    t.interrupt();
    }

等待通知机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.holelin.sundry.test.thread;
import java.text.SimpleDateFormat;
import java.util.Date;

public class Concurrent {

static boolean flag = true;
static Object lock = new Object();

public static void main(String[] args) {
Thread waitThread = new Thread(new Wait(), "Wait Thread");
waitThread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Thread notifyThread = new Thread(new Notify(), "Notify Thread");
notifyThread.start();
}

public static String getData() {
return new SimpleDateFormat("HH:mm:ss").format(new Date());
}

static class Wait implements Runnable {
@Override
public void run() {
synchronized (lock) {
while (flag) {
try {
System.out.println(Thread.currentThread() + " flag is true,wait @ " + getData());
lock.wait();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread() + " flag is false. running @ " + getData());
}
}
}

static class Notify implements Runnable {
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock. notify @ " + getData());
lock.notifyAll();
flag = false;
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep @ " + getData());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
等待/通知的经典范式
  • 等待方遵循的原则

    1
    2
    3
    4
    5
    6
    7
    8
    9
    synchronized (对象) {
    while (条件不满足) {
    try {
    对象.wait();
    } catch (InterruptedException e) {
    }
    }
    // 对应的处理逻辑
    }
  • 通知方遵循的原则

    1
    2
    3
    4
    synchronized (对象) {
    改变条件
    对象.notifyAll();
    }

管道输入/输出流

  • 管道输入/输出流和普通的文件输入/输出流或网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存.管道输入/输出流主要包括了如下四种具体实现:
    • PipedOutputStreeam
    • PipedInputStreeam
    • PipedReader
    • PipedWriter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.holelin.sundry.test.thread;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;

public class Piped {
public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输出流和输入流进行连接,否则在使用时会抛出 IOException
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}

static class Print implements Runnable {
private PipedReader in;

public Print(PipedReader in) {
this.in = in;
}
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException ex) {
}
}
}
}
  • 对于Piped类型的流,必须先要进行绑定,也就是调用connect()方法,如果没有将输出流和输入流进行连接,否则在使用时会抛出IOException