参考文献

AbstractQueuedSynchronizer

  • 队列同步器,简称同步器或AQS

AQS可重写的方法

方法名称 方法描述
protected boolean tryAcquire(int arg) 独占式获取同步状态
protected boolean tryRelease(int arg) 独占式释放同步状态
protected int tryAcquireShared(int arg) 共享式获取同步状态
返回值>=0表示获取成功,反之获取失败
protected boolean tryReleaseShared(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否在独占模式下被线程使用,一般该方法表示是否被当前线程独占.
  • 同步状态指的是有volatile修饰的state变量,涉及的相关方法

    方法名称 方法描述
    protected final int getState() 获取当前同步状态
    protected final void setState(int newState) 设置当前同步状态
    protected final boolean compareAndSetState(int expect, int update) 使用CAS设置当前同步状态,该方法会保证同步状态设置的原子性
  • 独占式和共享式操作state变量的区别

    1
    2
    3
    4
    5
    6
    7
    8
    9
       独占式
    state
    0---------->1(重入会+1,退出后会-1)


    共享式
    state
    0---------->N

AQS提供的模版方法

独占式/共享式 方法名称 方法描述
独占式 public final void acquire(int arg) 独占式获取同步状态,获取同步状态成功返回,失败则进入同步等待队列
同上 public final void acquireInterruptibly(int arg) acquire一样,只不过响应中断,当前线程获取同步状态失败进入等待队列;如果被中断,该方法抛出InterruptedException并返回
同上 public final boolean tryAcquireNanos(int arg, long nanosTimeout) acquireInterruptibly的基础上增加了超时限制,超时时间内获取到同步状态返回true,否则返回false
同上 public final boolean release(int arg) 独占式释放同步状态
共享式 public final void acquireShared(int arg) 共享式获取同步状态,与acquire的区别是同一时刻可以有多个线程获取同步状态
同上 public final void acquireSharedInterruptibly(int arg) acquireShared的基础上,增加了响应中断
同上 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) acquireSharedInterruptibly的基础上,增加了超时限制
同上 public final boolean releaseShared(int arg) 共享式释放同步状态

使用AQSLock实现自定义同步器和自定义锁的一般步骤

  1. 创建自定义同步器继承AbstractQueuedSynchronizer类,实现上述5个可重写方法.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    import java.util.concurrent.locks.Condition;

    public class MySync extends AbstractQueuedSynchronizer {
    @Override
    protected boolean tryAcquire(int acquires) {
    if (acquires == 1) {
    if (compareAndSetState(0, 1)) {
    setExclusiveOwnerThread(Thread.currentThread());
    return true;
    }
    }
    return false;
    }

    @Override
    protected boolean tryRelease(int acquires) {
    if (acquires == 1) {
    if (getState() == 0) {
    throw new IllegalMonitorStateException();
    }
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
    }
    return false;
    }

    protected Condition newCondition() {
    return new ConditionObject();
    }

    @Override
    protected boolean isHeldExclusively() {
    return getState() == 1;
    }
    }
  2. 创建自定义锁,实现Lock接口,组合使用自定义同步器来实现Lock中的方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;

    public class MyLock implements Lock {
    private static MySync sync = new MySync();

    @Override
    public void lock() {
    sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
    return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
    sync.release(1);
    }

    @Override
    public Condition newCondition() {
    return sync.newCondition();
    }
    }

AQS基本思想

  • 获取锁的逻辑

    1
    2
    3
    4
    5
    6
    while(state状态不允许获取){
    if(队列中还没有此线程){
    // 入队并阻塞
    }
    }
    // 当前线程出队
  • 释放锁的逻辑

    1
    2
    3
    if(state状态允许了){
    // 回复阻塞的线程(s)
    }
  • 要点

    • 原子维护state状态
    • 阻塞以及恢复现场
    • 维护队列

state设计

  • state使用volatile配合CAS保证其修改时的原子性;

  • state使用了32bit int来维护同步状态,因为当时使用long在很多平台下测试的结果并不理想;

    1
    2
    3
    4
    /**
    * The synchronization state.
    */
    private volatile int state;

阻塞恢复设计

  • 正确的控制线程暂停和恢复的APIsuspendresume,但它们是不推荐使用的,因为如果先调用的resume那么suspend将感知不到;
  • 解决办法是使用park&unpark来实现线程的暂停和恢复;
  • park&unpark是针对线程的,而不是针对同步器的,因此控制粒度更为精细;
  • park线程还可以通过interrupt打断

队列设计

  • 设计是借鉴CLH队列,它是一种单向无锁队列;

    • CLH是一种基于单向链表的高性能、公平的自旋锁
    • CLH好处:
      • 无锁,使用自旋;
      • 快速,无阻塞;
  • CLH中,每个节点(除了虚拟头节点)都代表一个等待线程.每个节点有一个前驱节点的引用,以实现队列的有序性.前驱节点是队列中的上一个节点,每个节点在自旋等待时会持续尝试访问前驱节点的状态,以判断是否可以进入临界区

  • CLH中的前驱节点不仅仅是用来实现队列的有序性,还用来存储节点所代表等待线程的状态.

    • 具体来说,在CLH中,每个节点存储了一个标志位(例如lockState),用于表示节点所代表的线程是否需要等待或者已经获取到锁.当一个线程需要获取锁时,会创建一个节点,并将其插入到队列的尾部,然后自旋等待.当前驱节点的线程释放锁时,它会将自己的状态写入前驱节点的标志位中,通知后继节点可以获取锁了.这样,每个节点都可以通过访问前驱节点的标志位,判断前驱节点所代表的线程是否已经释放锁,从而决定自己是否可以获取锁.
  • 队列中有headtail两个指针节点,都是用volatile修饰配合CAS使用,每个节点都有state维护节点状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * Head of the wait queue, lazily initialized. Except for
    * initialization, it is modified only via method setHead. Note:
    * If head exists, its waitStatus is guaranteed not to be
    * CANCELLED.
    */
    private transient volatile Node head;

    /**
    * Tail of the wait queue, lazily initialized. Modified only via
    * method enq to add new wait node.
    */
    private transient volatile Node tail;
  • 队列结构

    1
    2
    3
    4
    5
    * <pre>
    * +------+ prev +-----+ +-----+
    * head | | <---- | | <---- | | tail
    * +------+ +-----+ +-----+
    * </pre>

img

Node节点

  • AQS 内部维护了一个同步队列,用于管理同步状态。

    • 当线程获取同步状态失败时,就会将当前线程以及等待状态等信息构造成一个Node节点,将其加入到同步队列中尾部,阻塞该线程
    • 当同步状态被释放时,会唤醒同步队列中“首节点”的线程获取同步状态
  • 类成员

    img
Node属性 属性描述
volatile Node prev; 前驱节点
volatile Node next; 后继节点
volatile Thread thread; 获取同步状态的线程
volatile int waitStatus; 等待状态
Node nextWaiter; 指向下一个处在CONDITION状态的节点
waitStatus枚举 描述
static final int CANCELLED = 1; 由于超时或中断,线程获取锁的请求取消了,节点变成该状态不会再有变化
static final int SIGNAL = -1; 表示当前的后继节点被阻塞,当前节点释放锁时需要唤醒后继节点
static final int CONDITION = -2; 表示节点在条件等待队列,等待被唤醒
static final int PROPAGATE = -3; 表示释放共享锁时需要传播唤醒。PROPAGATE状态用于确保共享模式下的锁释放能够传播到后续节点
0 表示节点处于初始状态。在节点初始化时,等待状态会被设置为0

独占式(基于JDK11)

获取同步状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void lock() {
sync.acquire(1);
}
// ==> java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
// 尝试获取锁,若没有获得锁,以独占模式的节点加入队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ==> java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
  • 首先,先尝试非阻塞的方式获取同步状态,如果获取失败(tryAcquire返回false),则会调用addWaiter方法构造Node节点(Node.EXCLUSIVE采用独占式),并安全的(CAS)加入到同步队列的尾部.
尝试获取锁
  • 由子类实现
若尝试失败,则构造节点加入到等待队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(mode);
// 通过“死循环”确保节点被正确添加,最终将其设置为尾节点之后才会返回
for (;;) {
Node oldTail = tail;
// 如果队列不为空,CAS尝试将新Node对象加入AQS队列尾部
if (oldTail != null) {
// 第二次循环
// 将原来的尾节点设置为新Node对象的前驱节点
node.setPrevRelaxed(oldTail);
// 将tail尾节点的指向由原来的oldTail换成新的node节点
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
// 第一次循环
// 若队列是空队列,在初始化队列,新建一个哨兵节点,头尾节点都指向这个哨兵节点
initializeSyncQueue();
}
}
}
1
2
3
4
5
6
7
8
9
/**
* Initializes head and tail fields on first contention.
*/
private final void initializeSyncQueue() {
Node h;
// 将当前对象的头节点(HEAD)从 null 设置为新创建的节点
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}

img

  • 将创建好的节点添加到队列中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
// "死循环" 尝试获取锁,或挂起
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 若当前的前驱节点是Head节点,表示当前节点有资格来尝试获取锁
// 继而再次调用tryAcquire方法来尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取成功,设置当前线程对应的Node节点
setHead(node);
// 将哨兵节点的后继节点置为空,方便GC
p.next = null; // help GC
// 返回中断标记为false
return interrupted;
}
// 走到此次表明,当前Node节点前驱节点不是Head节点或者尝试获得锁失败
// 继而判断当前节点是否应该park(进入阻塞状态)
if (shouldParkAfterFailedAcquire(p, node))
// 当前节点需要进入阻塞状态
// 阻塞当前线程
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取当前节点的前驱的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果是 SIGNAL 状态,即等待被占用的资源释放,直接返回 true
// 准备继续调用 parkAndCheckInterrupt 方法
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// ws > 0 表示是CANCELLED取消状态
if (ws > 0) {

// 循环判断前驱节点的前驱节点是否也为CANCELLED状态,若也是CANCELLED,删除该状态的节点,重新连接队列
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这次还没有阻塞,这时需要设置当前的节点的前驱的状态为Node.SIGNAL
// 但下次如果重试不成功,则需要阻塞
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
  • 注意: 是否需要park是由当前节点的前驱节点的waitStatus是否为Node.SIGNAL来决定的,而不是当前节点的waitStatus决定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Convenience method to park and then check if interrupted.
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
// 线程挂起,程序不会继续向下执行
LockSupport.park(this);
// 根据 park 方法 API描述,程序在下述三种情况会继续向下执行
// 1. 被 unpark
// 2. 被中断(interrupt)
// 3. 其他不合逻辑的返回才会继续向下执行

// 因上述三种情况程序执行至此,返回当前线程的中断状态,并清空中断状态
// 如果由于被中断,该方法会返回 true
return Thread.interrupted();
}
acquireQueued抛出异常进入取消状态
  • 查看 try 代码块,只有两个方法会抛出异常:
    • node.processor() 方法: throw new NullPointerException();
    • 自己重写的 tryAcquire() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// 忽略无效节点
if (node == null)
return;

// 将关联的线程信息清空
node.thread = null;

// Skip cancelled predecessors
// 跳过同样是取消状态的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary, although with
// a possibility that a cancelled node may transiently remain
// reachable.
// 跳出上面循环说明找到前驱有效(非取消状态)节点,并获取该有效节点的后继节点
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 将当前节点的状态置为 CANCELLED
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
// 情况一: 如果当前节点处在尾节点,直接从队列中删除自己就好
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 1. 如果当前节点的有效前驱节点不是头节点,也就是说当前节点不是头节点的后继节点
if (pred != head &&
// 2. 判断当前节点有效前驱节点的状态是否为 SIGNAL
((ws = pred.waitStatus) == Node.SIGNAL ||
// 3. 如果不是,尝试将前驱节点的状态置为 SIGNAL
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
// 上述条件满足
Node next = node.next;
// 情况二: 将当前节点有效前驱节点的后继节点指针指向当前节点的后继节点
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
// 情况三: 如果当前节点的前驱节点是头节点,或者上述其他条件不满足,就唤醒当前节点的后继节点
unparkSuccessor(node);
}

node.next = node; // help GC
}
}
  • 其核心目的就是从等待队列中移除CANCELLED的节点,并重新拼接整个队列
img img img
总体流程图

img

释放同步状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void unlock() {
// 释放锁
sync.release(1);
}
public final boolean release(long arg) {
// 调用自定义同步器重写的 tryRelease 方法尝试释放同步状态
if (tryRelease(arg)) {
// 释放成功,获取头节点
Node h = head;
// 存在头节点,并且waitStatus不是初始状态
// 在获取的过程中会将 waitStatus的值从初始状态更新成 SIGNAL 状态
if (h != null && h.waitStatus != 0)
// 解除线程挂起状态
unparkSuccessor(h);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 获取头节点的waitStatus
int ws = node.waitStatus;
if (ws < 0)
// 清空头节点的waitStatus值,即置为0
node.compareAndSetWaitStatus(ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取头节点的后继节点
Node s = node.next;
// 判断当前节点的后继节点是否是取消状态,如果是,需要移除,重新连接队列
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点向前查找,找到队列第一个waitStatus状态小于0(非取消状态)的节点
for (Node p = tail; p != node && p != null; p = p.prev)
// 如果是独占式,这里小于0,其实就是 SIGNAL
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
// 解除线程挂起状态
LockSupport.unpark(s.thread);
}
  • 采用从尾节点向前查找的原因
    • 节点入队并不是原子操作,如果此时代码还没执行到pred.next = node; 这时又恰巧执行了unparkSuccessor方法,就没办法从前往后找了,因为后继指针还没有连接起来,所以需要从后往前找
    • 产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此这也是必须要从后往前遍历才能够遍历完全部的Node

独占式响应中断获取同步状态

1
2
3
4
5
6
7
8
9
10
11
12
 public void lockInterruptibly() throws InterruptedException {
// 调用同步器模版方法可中断式获取同步状态
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(long arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试非阻塞式获取同步状态失败,如果没有获取到同步状态
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(long arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 获取中断信号后,不再返回 interrupted = true 的值,而是直接抛出 InterruptedException
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

独占式超时限制获取同步状态

  • 就是给定一个时限,在该时间段内获取到同步状态,就返回 true, 否则,返回 false。好比线程给自己定了一个闹钟,闹铃一响,线程就自己返回了,这就不会使自己是阻塞状态了

  • 既然涉及到超时限制,其核心逻辑肯定是计算时间间隔,因为在超时时间内,肯定是多次尝试获取锁的,每次获取锁肯定有时间消耗,所以计算时间间隔的逻辑就像我们在程序打印程序耗时 log 那么简单

    1
    nanosTimeout = deadline - System.nanoTime()
1
2
3
4
5
6
7
public final boolean tryAcquireNanos(long arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(long arg, long nanosTimeout)
throws InterruptedException {
// 超时时间内,为获取到同步状态,直接返回false
if (nanosTimeout <= 0L)
return false;
// 计算超时截止时间
final long deadline = System.nanoTime() + nanosTimeout;
// 以独占方式加入到同步队列中
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
// 计算新的超时时间
nanosTimeout = deadline - System.nanoTime();
// 如果超时,取消当前线程,返回 false
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 判断是最新超时时间是否大于阈值 1000
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
// 挂起线程 nanosTimeout 长时间,时间到,自动返回
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

条件变量

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
// 表示await()方法允许被中断
if (Thread.interrupted())
throw new InterruptedException();
// 创建一个新的节点,节点状态为condition,采用的数据结仍然是链表(单向)
Node node = addConditionWaiter();
// 释放当前的锁,得到锁的状态,并唤醒AQS队列中的一个线程
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果当前节点没有在同步队列(AQS)上,即还没有被signal,则将当前线程阻塞
// 第一次判断的是false,因为前面已经释放锁了
while (!isOnSyncQueue(node)) {
// 通过park挂起当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 当这个线程醒来,会尝试拿锁,当acquireQueued返回false就是拿到锁了
// interruptMode != THROW_IE ->表示这个线程没有成功将node入队,但signal执行了enq方法让其入队了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果 node 的下一个等待者不是 null, 则进行清理, 清理 Condition 队列上的节点.
// 如果是 null ,就没有什么好清理的了.
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果线程被中断了,需要抛出异常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果lastWaiter是中断的,就清理出去。
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 构建一个Node,waitStatus=CONDITION。这里的链表是一个单向的,所以相比AQS来说会简单很多
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

为什么这里是单向队列,也没有使用CAS 来保证加入队列的安全性呢?

因为awaitLock范式try中使用的,说明已经获取到锁了,所以就没必要使用CAS了,至于是单向,因为这里还不涉及到竞争锁,只是做一个条件等待队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
// 当前节点已经是尾节点了
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 将等待队列中的Node转移到AQS队列,不成功且还有节点则继续循环
} while (!transferForSignal(first) &&
// 队列中还有节点
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将节点添加到同步队列中,并返回其前驱节点
Node p = enq(node);
int ws = p.waitStatus;
// 前驱节点的状态为取消状态或者变更前驱状态为SIGNAL失败
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒同步队列中该线程
// 这是因为如果无法将前驱节点的等待状态设置为 SIGNAL,说明前驱节点可能已经被取消或者有其他原因导致设置失败,此时需要手动唤醒当前节点对应的线程。
LockSupport.unpark(node.thread);
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

共享式(基于JDK11)

获取同步状态

1
2
3
4
5
6
public final void acquireShared(int arg) {
// 调用自定义同步器需要重写的方法,非阻塞式的尝试获取同步状态,如果结果小于零,则获取同步状态失败
if (tryAcquireShared(arg) < 0)
// 调用 AQS 提供的模版方法,进入等待队列
doAcquireShared(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 创建SHARED模式的节点,添加到等待队列中
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
// 进入“自旋”
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点为头节点,尝试再次获取同步状态
if (p == head) {
// 在此以非阻塞式获取同步状态
int r = tryAcquireShared(arg);
// 如果返回结果大于等于零,才能跳出外层循环返回
if (r >= 0) {
// 这里是和独占式的区别
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
状态 说明
propagate < 0 说明当前线程获取同步状态失败了
propagate > 0 说明当前线程获取同步状态成功了,还有剩余的同步状态可用于其他线程的获取
propagate == 0 说明当前线程获取同步状态成功了,但没有剩余同步状态可用于其他线程的获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
// 入参,node: 当前节点
// 入参,propagate:获取同步状态的结果值,即上面方法中的变量 r
// 记录旧的头部节点,用于下面的check
Node h = head; // Record old head for check below
// 将当前节点设置为头节点
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 通过 propagate 的值和 waitStatus 的值来判断是否可以调用 doReleaseShared 方法
// 这里的h == null和 (h = head) == null和s == null 是为了防止空指针异常发生的标准写法,但这不代表就一定发现它们为空的情况
// 这里 h == null和(h = head) == null是不可能成立的,因为只要执行过addWaiter,CLH队列至少会有一个Node存在
// 但s == null可能发生,比如node已经是队列最后一个节点了
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果后继节点为空或者后继节点为共享类型,则进行唤醒后继节点
// 这里后继节点为空意思是只剩下当前头节点了,另外这里的 s == null 也是判断空指针的标准写法
if (s == null || s.isShared())
doReleaseShared();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// CAS 将头节点的状态设置为0
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
// 设置成功后才能跳出循环唤醒头节点的下一个节点
unparkSuccessor(h);
}
else if (ws == 0 &&
//将头节点状态CAS设置成 PROPAGATE 状态
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

ReentrantLock

特点

  • 可中断; reentrantLock.lockInterruptibly();
  • 可设置超时时间;reentrantLock.ryLock(long timeout, TimeUnit unit);
  • 可以设置为公平锁;new ReentrantLock(true);
  • 支持多个条件变量;
  • synchronized一样都支持可重入;

Condition 类和 Object 类方法区别区别

  1. Condition 类的 awiat方法和Object类的wait方法等效
  2. Condition类的 signal 方法和Object类的notify方法等效
  3. Condition类的signalAll方法和Object类的notifyAll方法等效
  4. ReentrantLock 类可以唤醒指定条件的线程,而Object的唤醒是随机的

ReentrantLock非公平锁源码分析(JDK11)

加锁分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
reentrantLock.lock();
// ==> java.util.concurrent.locks.ReentrantLock#lock
public void lock() {
sync.acquire(1);
}
// ==> java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
// 尝试获取锁,若成功获得以独占模式的节点加入队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ==> java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
1
2
3
4
5
6
7
8
9
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没有获得锁
if (c == 0) {
// 尝试用CAS获得,这里体现了非公平性:不去检查AQS队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁,线程还是当前线程,表示发送了锁重入
else if (current == getExclusiveOwnerThread()) {
// <==> state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败,回到调用处
return false;
}

解锁分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 释放锁
reentrantLock.unlock();
// java.util.concurrent.locks.ReentrantLock#unlock
public void unlock() {
sync.release(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 队列不为空,且waitStatus==Node.SIGNAL才需要unpark
if (h != null && h.waitStatus != 0)
// unpark
unparkSuccessor(h);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入,只有state减为0,才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

ReentrantLock公平锁源码分析

加锁分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查AQS队列中是否有前驱节点,没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
  • hasQueuedPredecessors

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public final boolean hasQueuedPredecessors() {
    Node h, s;
    // AQS队列不为空
    if ((h = head) != null) {
    // 排除处于取消状态的节点
    if ((s = h.next) == null || s.waitStatus > 0) {
    s = null; // traverse in case of concurrent cancellation
    // 从尾部向头部遍历,找到队列中的老二节点
    for (Node p = tail; p != h && p != null; p = p.prev) {
    if (p.waitStatus <= 0)
    s = p;
    }
    }
    // 判断队列中的老二节点的线程是不是当前线程
    if (s != null && s.thread != Thread.currentThread())
    return true;
    }
    return false;
    }

使用公平锁会有什么问题?

公平锁保证了排队的公平性,非公平锁霸气的忽视这个规则,所以就有可能导致排队的长时间在排队,也没有机会获取到锁,这就是传说中的 “饥饿”