/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ privatetransientvolatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ privatetransientvolatile Node tail;
// java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node) { // 获取当前节点的前驱的状态 intws= pred.waitStatus; if (ws == Node.SIGNAL) // 如果是 SIGNAL 状态,即等待被占用的资源释放,直接返回 true // 准备继续调用 parkAndCheckInterrupt 方法 /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; // ws > 0 表示是CANCELLED取消状态 if (ws > 0) {
// 循环判断前驱节点的前驱节点是否也为CANCELLED状态,若也是CANCELLED,删除该状态的节点,重新连接队列 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 这次还没有阻塞,这时需要设置当前的节点的前驱的状态为Node.SIGNAL // 但下次如果重试不成功,则需要阻塞 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } returnfalse; }
/** * Cancels an ongoing attempt to acquire. * * @param node the node */ privatevoidcancelAcquire(Node node) { // Ignore if node doesn't exist // 忽略无效节点 if (node == null) return;
// 将关联的线程信息清空 node.thread = null;
// Skip cancelled predecessors // 跳过同样是取消状态的前驱节点 Nodepred= node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary, although with // a possibility that a cancelled node may transiently remain // reachable. // 跳出上面循环说明找到前驱有效(非取消状态)节点,并获取该有效节点的后继节点 NodepredNext= pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 将当前节点的状态置为 CANCELLED node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. // 情况一: 如果当前节点处在尾节点,直接从队列中删除自己就好 if (node == tail && compareAndSetTail(node, pred)) { pred.compareAndSetNext(predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; // 1. 如果当前节点的有效前驱节点不是头节点,也就是说当前节点不是头节点的后继节点 if (pred != head && // 2. 判断当前节点有效前驱节点的状态是否为 SIGNAL ((ws = pred.waitStatus) == Node.SIGNAL || // 3. 如果不是,尝试将前驱节点的状态置为 SIGNAL (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) { // 上述条件满足 Nodenext= node.next; // 情况二: 将当前节点有效前驱节点的后继节点指针指向当前节点的后继节点 if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next); } else { // 情况三: 如果当前节点的前驱节点是头节点,或者上述其他条件不满足,就唤醒当前节点的后继节点 unparkSuccessor(node); }
/** * Wakes up node's successor, if one exists. * * @param node the node */ privatevoidunparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 获取头节点的waitStatus intws= node.waitStatus; if (ws < 0) // 清空头节点的waitStatus值,即置为0 node.compareAndSetWaitStatus(ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 获取头节点的后继节点 Nodes= node.next; // 判断当前节点的后继节点是否是取消状态,如果是,需要移除,重新连接队列 if (s == null || s.waitStatus > 0) { s = null; // 从尾节点向前查找,找到队列第一个waitStatus状态小于0(非取消状态)的节点 for (Nodep= tail; p != node && p != null; p = p.prev) // 如果是独占式,这里小于0,其实就是 SIGNAL if (p.waitStatus <= 0) s = p; } if (s != null) // 解除线程挂起状态 LockSupport.unpark(s.thread); }
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignal() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); Nodefirst= firstWaiter; if (first != null) doSignal(first); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ privatevoiddoSignal(Node first) { do { // 当前节点已经是尾节点了 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 将等待队列中的Node转移到AQS队列,不成功且还有节点则继续循环 } while (!transferForSignal(first) && // 队列中还有节点 }
finalbooleantransferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 将节点添加到同步队列中,并返回其前驱节点 Nodep= enq(node); intws= p.waitStatus; // 前驱节点的状态为取消状态或者变更前驱状态为SIGNAL失败 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 唤醒同步队列中该线程 // 这是因为如果无法将前驱节点的等待状态设置为 SIGNAL,说明前驱节点可能已经被取消或者有其他原因导致设置失败,此时需要手动唤醒当前节点对应的线程。 LockSupport.unpark(node.thread); returntrue; }
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Nodet= tail; if (t == null) { // Must initialize if (compareAndSetHead(newNode())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignalAll() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); Nodefirst= firstWaiter; if (first != null) doSignalAll(first); }
1 2 3 4 5 6 7 8 9 10 11 12 13
/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ privatevoiddoSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Nodenext= first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ privatevoidsetHeadAndPropagate(Node node, int propagate) { // 入参,node: 当前节点 // 入参,propagate:获取同步状态的结果值,即上面方法中的变量 r // 记录旧的头部节点,用于下面的check Nodeh= head; // Record old head for check below // 将当前节点设置为头节点 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ // 通过 propagate 的值和 waitStatus 的值来判断是否可以调用 doReleaseShared 方法 // 这里的h == null和 (h = head) == null和s == null 是为了防止空指针异常发生的标准写法,但这不代表就一定发现它们为空的情况 // 这里 h == null和(h = head) == null是不可能成立的,因为只要执行过addWaiter,CLH队列至少会有一个Node存在 // 但s == null可能发生,比如node已经是队列最后一个节点了 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Nodes= node.next; // 如果后继节点为空或者后继节点为共享类型,则进行唤醒后继节点 // 这里后继节点为空意思是只剩下当前头节点了,另外这里的 s == null 也是判断空指针的标准写法 if (s == null || s.isShared()) doReleaseShared(); } }
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ privatevoiddoReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Nodeh= head; if (h != null && h != tail) { intws= h.waitStatus; if (ws == Node.SIGNAL) { // CAS 将头节点的状态设置为0 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases // 设置成功后才能跳出循环唤醒头节点的下一个节点 unparkSuccessor(h); } elseif (ws == 0 && //将头节点状态CAS设置成 PROPAGATE 状态 !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }