/** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatileint waitStatus;
// 返回前驱节点 final Node predecessor()throws NullPointerException { Node p = prev; if (p == null) thrownew NullPointerException(); else return p; }
Node() { // Used to establish initial head or SHARED marker }
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
关键成员变量
1 2 3 4 5 6 7 8
// 头节点 privatetransientvolatile Node head;
// 尾节点 privatetransientvolatile Node tail;
// 同步状态 AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改 privatevolatileint state;
acquire方法
以独占方式获取资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ publicfinalvoidacquire(int arg){ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode){ Node node = new Node(Thread.currentThread(), mode); // 创建独占模式节点 // Try the fast path of enq; backup to full enq on failure // 尝试将节点快速插入(CAS)等待队列,若失败则执行常规插入(enq方法) Node pred = tail; if (pred != null) { // 将节点的前驱节点指向尾节点 node.prev = pred; // CAS 将尾节点修改为 node 节点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // CAS失败,则执行常规插入 enq(node); return node; }
privatestaticfinal Unsafe unsafe = Unsafe.getUnsafe(); /** * CAS head field. Used only by enq. */ privatefinalbooleancompareAndSetHead(Node update){ return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * CAS tail field. Used only by enq. */ privatefinalbooleancompareAndSetTail(Node expect, Node update){ return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ // 获取前继节点的waitStatus值 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // SIGNAL = -1; /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 前驱节点 waitStatus = -1,表示前驱节点完成资源的释放或者中断后,会通知当前节点,不用自旋 returntrue; if (ws > 0) { // CANCELLED = 1; /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 前驱节点 waitStatus > 0,表示前驱节点处于放弃状态(CANCELLED) // 继续往前遍历,直到当前节点的前继节点的ws值为0或负数 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 如果前驱节点的ws值 < -1 // 将前驱节点的ws值设置为Node.SIGNAL // 保证下次自旋(acquireQueued方法中的自旋)时,shouldParkAfterFailedAcquire直接返回true compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
parkAndCheckInterrupt方法
调用LockSupport类的park()方法阻塞当前线程,并当前线程是否已经中断
1 2 3 4 5 6 7 8 9 10
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ privatefinalbooleanparkAndCheckInterrupt(){ LockSupport.park(this); // 阻塞当前线程 // 如果被唤醒,查看⾃己是不不是被中断的 return Thread.interrupted(); }
/** * Cancels an ongoing attempt to acquire. * * @param node the node */ privatevoidcancelAcquire(Node node){ // Ignore if node doesn't exist if (node == null) return;
node.thread = null;
// Skip cancelled predecessors // 找出waitStatus不为1的前驱节点 Node pred = node.prev; while (pred.waitStatus > 0) // CANCELLED = 1 node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 把当前node的状态设置为CANCELLED,当下一个node排队结束时,自己就会被处理掉 node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. // 如果我们是尾节点,将前驱节点(waitStatus不为1)设置为尾节点 if (node == tail && compareAndSetTail(node, pred)) { // 将当前节点从队列中移除 compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; // 如果前驱节点不是头节点,判断前驱节点的waitStatus是否等于Node.SIGNAL(-1) // 如果前驱节点的waitStatus不为-1,则设置为-1 // 如果前驱节点的waitStatus为-1,或者被成功设置为-1 // 把当前节点的前驱节点的后继指针指向当前节点的后继节点 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果前驱节点是头节点,或者前驱节点的waitStatus不为-1,设置-1失败了 // 则唤醒当前节点的后继节点 unparkSuccessor(node); }
node.next = node; // help GC } }
/** * Wakes up node's successor, if one exists. * * @param node the node */ privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 如果node节点的后继节点为null,或者waitStatus为1,则从后向前遍历 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ publicfinalbooleanrelease(int arg){ if (tryRelease(arg)) { // tryRelease也是有继承AQS的自定义同步器来具体实现 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 唤醒头结点的后继节点 returntrue; } returnfalse; }
/** * Wakes up node's successor, if one exists. * * @param node the node */ privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 如果node节点的后继节点为null,或者waitStatus为1,则从后向前遍历 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ publicfinalvoidacquireShared(int arg){ if (tryAcquireShared(arg) < 0) // 负数表示获取失败 doAcquireShared(arg); }
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ privatevoidsetHeadAndPropagate(Node node, int propagate){ Node h = head; // Record old head for check below setHead(node); // 将自己设置为头节点 /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ // 如果资源还有剩余,则唤醒后继节点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } privatevoidsetHead(Node node){ head = node; node.thread = null; node.prev = null; }
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ privatevoiddoReleaseShared(){ /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 若head节点状态为SIGNAL,则自旋CAS状态设置为0之后才能唤醒head结点的后继节点 if (ws == Node.SIGNAL) { // SIGNAL = -1 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // SIGNAL = -1 continue; // loop to recheck cases // 唤醒后继节点 unparkSuccessor(h); } // 若head节点状态为0,则自旋CAS将节点状态设置为PROPAGATE elseif (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // PROPAGATE = -3 continue; // loop on failed CAS } // head指针在自旋期间未发生移动的话,跳出自旋 if (h == head) // loop if head changed break; } }
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ publicfinalbooleanreleaseShared(int arg){ if (tryReleaseShared(arg)) { doReleaseShared(); // 唤醒后继节点 returntrue; } returnfalse; }
/** First node of condition queue. */ privatetransient Node firstWaiter; /** Last node of condition queue. */ privatetransient Node lastWaiter; /** Mode meaning to reinterrupt on exit from wait */ privatestaticfinalint REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ privatestaticfinalint THROW_IE = -1;
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignal(){ // 如果当前线程不是持有该condition的锁,抛IllegalMonitorStateException异常 if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ privatevoiddoSignal(Node first){ do { // 头节点指针往后移动一个节点 // 若当前头结点的后继节点为null,则尾节点置为null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 将之前头节点的后继节点置为null,便于GC first.nextWaiter = null; // 头节点未成功转移到AQS的CLH同步队列且头节点不为null,继续自旋 } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 用于将条件队列节点转移到AQS的CLH同步队列 finalbooleantransferForSignal(Node node){ // 如果CAS失败,说明节点在signal之前被取消了,返回false // 设置入队节点的等待状态为0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse; // 设置成功则AQS的CLH同步队列尾部. Node p = enq(node); // 获取入队节点的前继节点的状态 int ws = p.waitStatus; // 1. 如果前继节点取消,则直接唤醒当前节点线程 // 2. 如果为非取消节点则将前继节点设置为SIGNAL(等前驱节点唤醒自己) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); returntrue; }
线程被唤醒后,则会进入checkInterruptWhileWaiting方法
signalAll方法
唤醒在条件等待队列中所有的节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignalAll(){ // 如果当前线程不是持有该condition的锁,抛IllegalMonitorStateException异常 if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
doSignalAll方法
将所有节点转移到AQS的CLH同步队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ privatevoiddoSignalAll(Node first){ lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; // 将节点转移到AQS的CLH同步队列 transferForSignal(first); first = next; } while (first != null); }
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ publicfinalbooleanawait(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) thrownew InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); finallong deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }