0%

深入理解AQS

所谓AQS,指的是AbstractQueuedSynchronizer,它提供了一种实现阻塞锁一系列依赖FIFO等待队列的同步器的框架ReentrantLockSemaphoreCountDownLatchCyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。

基本原理

AQS 核心思想:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中

CLH,是 Craig、Landin 和 Hagersten 三位作者的缩写,CLH是一种基于链表的高性能、公平的自旋锁。申请加锁的线程通过前驱节点的变量进行自旋。在前置节点解锁后,当前节点会结束自旋,并进行加锁。具体内容可以查看《Building FIFO and Priority-Queuing Spin Locks from Atomic Swap》论文。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。

AQS维护了一个volatile语义的共享资源变量state和一个FIFO线程等待队列

资源共享方式

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在上层已经帮我们实现好了。

Exclusive(独占)

只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁,ReentrantLock 同时支持两种锁,下面以ReentrantLock 对这两种锁的定义做介绍:

  • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁:当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。

Share(共享)

多个线程可同时执行,如 Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock。ReentrantReadWriteLock可以看成是组合式,因为 ReentrantReadWriteLock允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在上层已经帮我们实现好了。

模板方法

AQS的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

自定义同步器时需要重写下面几个 AQS 提供的模板方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 独占方式。尝试获取资源,成功则返回true,失败则返回false
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 独占方式。尝试释放资源,成功则返回true,失败则返回false
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享方式。尝试释放资源,成功则返回true,失败则返回false
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 该线程是否正在独占资源。只有用到condition才需要去实现它
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS 类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

CLH队列

Node主要包含thread + waitStatus + pre + next 四个属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
static final class Node {
// 表明节点在共享模式下等待的标记
static final Node SHARED = new Node();
// 表明节点在独占模式下等待的标记
static final Node EXCLUSIVE = null;

// 下面这四个常量都是给waitStatus用的
// 表示 此线程取消了争抢这个锁
static final int CANCELLED = 1;
// 表示 后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
// 表示 线程正在等待触发条件
static final int CONDITION = -2;
// 表示 下一个acquireShared应无条件传播
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

// 前驱节点
volatile Node prev;

// 后继节点
volatile Node next;

// 该节点对应的线程
volatile Thread thread;

// 连接下一个等待条件触发的节点
Node nextWaiter;

// 返回节点是否处于Shared状态下
final boolean isShared() {
return nextWaiter == SHARED;
}

// 返回前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

关键成员变量

1
2
3
4
5
6
7
8
// 头节点
private transient volatile Node head;

// 尾节点
private transient volatile Node tail;

// 同步状态 AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改
private volatile int state;

acquire方法

以独占方式获取资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

acquire()至少执行一次tryAcquire(),若tryAcquire()返回true,则acquire()直接返回,否则进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。

selfInterrupt()作用:用户线程第一次获取锁失败之后,进入CLH队列,此时用户可能会中断该线程,所以线程从CLH队列被唤醒之后,要先检查一下之前有没有被中断过,如果中断过了,此时再中断线程。

tryAcquire方法

获取指定量的资源,tryAcquire方法是模板方法,默认抛出UnsupportedOperationException异常,需要由子类即自定义同步器来实现。

addWaiter方法

为当前线程以指定模式创建节点,并将其添加到等待队列的尾部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 创建独占模式节点
// Try the fast path of enq; backup to full enq on failure
// 尝试将节点快速插入(CAS)等待队列,若失败则执行常规插入(enq方法)
Node pred = tail;
if (pred != null) {
// 将节点的前驱节点指向尾节点
node.prev = pred;
// CAS 将尾节点修改为 node 节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// CAS失败,则执行常规插入
enq(node);
return node;
}

enq方法

常规插入,和快速插入有以下两点不同:

  1. 常规插入是自旋过程(for(;;)),能够保证节点插入成功
  2. 比快速插入多包含了1种情况:尾节点为空时(当前等待队列为空),需要初始化队列,再插入node节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

快速插入和常规插入均依赖CAS操作,其实现依赖于Unsafe类,Unsafe类中的cas操作均是native方法,由计算机CPU的cmpxchg指令来保证其原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
private static final Unsafe unsafe = Unsafe.getUnsafe();
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

acquireQueued方法

基于CAS自旋不断尝试获取资源,如果前驱节点不是头结点,线程会被阻塞(park)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
// 标识是否获取资源失败
boolean failed = true;
try {
// 标识当前线程是否被中断过
boolean interrupted = false;
for (;;) {
// 获取当前节点的前继节点
final Node p = node.predecessor();
// 如果前继节点为头结点,说明排队马上排到自己了,可以尝试获取资源
if (p == head && tryAcquire(arg)) {
// 获取资源成功,将当前节点设置为头结点
setHead(node);
p.next = null; // help GC(以方便虚拟机回收掉该前继节点)
// 标识获取资源成功
failed = false;
// 返回是否被中断过标识
return interrupted;
}
// 运行到这,说明前驱节点不是头结点或者尝试获取资源失败
// 通过shouldParkAfterFailedAcquire方法判断是否需要阻塞该节点持有的线程
// 若shouldParkAfterFailedAcquire函数返回true,则继续执行parkAndCheckInterrupt方法
// 将该线程阻塞并检查是否被中断过(阻塞期间),若返回true,则将interrupted标志置于true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 最终获取资源失败,则当前节点放弃获取资源
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null; // 头结点的thread为空,表示正在运行的线程
node.prev = null;
}

shouldParkAfterFailedAcquire方法

判断是否需要阻塞该节点持有的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前继节点的waitStatus值
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // SIGNAL = -1;
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 前驱节点 waitStatus = -1,表示前驱节点完成资源的释放或者中断后,会通知当前节点,不用自旋
return true;
if (ws > 0) { // CANCELLED = 1;
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 前驱节点 waitStatus > 0,表示前驱节点处于放弃状态(CANCELLED)
// 继续往前遍历,直到当前节点的前继节点的ws值为0或负数
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 如果前驱节点的ws值 < -1
// 将前驱节点的ws值设置为Node.SIGNAL
// 保证下次自旋(acquireQueued方法中的自旋)时,shouldParkAfterFailedAcquire直接返回true
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt方法

调用LockSupport类的park()方法阻塞当前线程,并当前线程是否已经中断

1
2
3
4
5
6
7
8
9
10
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 阻塞当前线程
// 如果被唤醒,查看⾃己是不不是被中断的
return Thread.interrupted();
}

Thread.interrupted():获取当前线程的中断状态,并且重置中断状态

LockSupport

AQS使用了LockSupport类阻塞和唤醒线程,为什么不使用Object的wait/notify呢?

相比Object的wait/notify,LockSupport有两大优势:

  1. LockSupport不需要在同步代码块里 。所以线程间也不需要维护一个共享的同步对象了,实现了线程间的解耦
  2. unpark函数可以先于park调用,所以不需要担心线程间的执行的先后顺序

cancelAcquire

当failed为true时(表示获取资源失败),执行cancelAcquire方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// Skip cancelled predecessors
// 找出waitStatus不为1的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0) // CANCELLED = 1
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 把当前node的状态设置为CANCELLED,当下一个node排队结束时,自己就会被处理掉
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
// 如果我们是尾节点,将前驱节点(waitStatus不为1)设置为尾节点
if (node == tail && compareAndSetTail(node, pred)) {
// 将当前节点从队列中移除
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 如果前驱节点不是头节点,判断前驱节点的waitStatus是否等于Node.SIGNAL(-1)
// 如果前驱节点的waitStatus不为-1,则设置为-1
// 如果前驱节点的waitStatus为-1,或者被成功设置为-1
// 把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果前驱节点是头节点,或者前驱节点的waitStatus不为-1,设置-1失败了
// 则唤醒当前节点的后继节点
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 如果node节点的后继节点为null,或者waitStatus为1,则从后向前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

总结

  1. 首先线程通过tryAcquire(arg)尝试获取共享资源,若获取成功则直接返回,若不成功,则将该线程以独占模式添加到等待队列尾部,tryAcquire(arg)由继承AQS的自定义同步器来具体实现;
  2. 当前线程加入等待队列后,会通过acquireQueued方法基于CAS自旋不断尝试获取资源,直至获取到资源;
  3. 自旋获取资源时,如果前驱节点不是头结点或获取资源失败,会阻塞线程(前驱节点释放资源时会被唤醒)
  4. 若在自旋过程中,线程被中断过,acquireQueued方法会标记此次中断,并返回true;
  5. 若acquireQueued方法获取到资源后,返回true,则执行线程自我中断操作selfInterrupt()。

release方法

以独占方式释放资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) { // tryRelease也是有继承AQS的自定义同步器来具体实现
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒头结点的后继节点
return true;
}
return false;
}

tryRelease方法

释放指定量的资源,tryRelease方法是模板方法,默认抛出UnsupportedOperationException异常,需要由子类即自定义同步器来实现。

unparkSuccessor方法

唤醒后继节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 如果node节点的后继节点为null,或者waitStatus为1,则从后向前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

为什么唤醒后继节点中,是从后向前遍历?

因为cancelAcquire方法的处理过程中只设置了next的变化,没有设置prev的变化,在最后有这样一行代码:node.next = node。并发情况下从前向后遍历的话,可能就死循环了,所以这时只有prev是稳定的。

唤醒后继节点的线程后,被唤醒的线程在parkAndCheckInterrupt方法,返回线程在park过程中是否被用户中断过,然后到acquireQueued方法中,如果该节点的前驱节点是头节点,则尝试获取资源,成功获取资源后,将是否被中断标识返回acquire方法,如果被中断过,那此时中断。如果被唤醒的线程所在节点的前继节点不是头结点,经过shouldParkAfterFailedAcquire的调整(清除无效(waitState=CANCELLED)的节点),也会移动到等待队列的前面,直到其前继节点为头结点。

acquireShared方法

以共享方式获取资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 负数表示获取失败
doAcquireShared(arg);
}

doAcquireShared方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 将线程以共享模式添加到等待队列的尾部
final Node node = addWaiter(Node.SHARED);
// 标识是否获取资源失败
boolean failed = true;
try {
// 标识当前线程是否被中断过
boolean interrupted = false;
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
if (p == head) {
// 如果前驱节点是头结点,则尝试获取资源
int r = tryAcquireShared(arg);
// 如果获取资源成功,且有剩余资源,将自己设为头结点并唤醒后续的阻塞线程
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted) // 如果中断标志位为真,则线程执行自我了断
selfInterrupt();
// 表示获取资源成功
failed = false;
return;
}
}
// 运行到这,说明前驱节点不是头结点或者尝试获取资源失败
// 通过shouldParkAfterFailedAcquire方法判断是否需要阻塞该节点持有的线程
// 若shouldParkAfterFailedAcquire函数返回true,则继续执行parkAndCheckInterrupt方法
// 将该线程阻塞并检查是否被中断过(阻塞期间),若返回true,则将interrupted标志置于true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 最终获取资源失败,则当前节点放弃获取资源
if (failed)
cancelAcquire(node);
}
}

可以发现,doAcquireShared与独占模式下的acquireQueued大同小异,主要有3点不同:

  1. doAcquireShared以指定模式创建节点,并将其添加到等待队列的尾部的操作放在了方法体内部;
  2. doAcquireShared将线程的自我中断操作放在了方法体内部;
  3. 当线程获取到资源后,doAcquireShared会将当前线程所在的节点设为头结点,若资源有剩余则唤醒后续节点,比acquireQueued多了个唤醒后续节点的操作。

setHeadAndPropagate方法

将自己设为头结点并唤醒后续的阻塞线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); // 将自己设置为头节点
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 如果资源还有剩余,则唤醒后继节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

doReleaseShared方法

向后继节点发出信号并确保传播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 若head节点状态为SIGNAL,则自旋CAS状态设置为0之后才能唤醒head结点的后继节点
if (ws == Node.SIGNAL) { // SIGNAL = -1
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // SIGNAL = -1
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
// 若head节点状态为0,则自旋CAS将节点状态设置为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // PROPAGATE = -3
continue; // loop on failed CAS
}
// head指针在自旋期间未发生移动的话,跳出自旋
if (h == head) // loop if head changed
break;
}
}

Node.PROPAGATE的作用

学习AQS的过程中,发现Propagate这个状态并没有被显示地使用。

具体可参考:

  1. https://www.cnblogs.com/micrari/p/6937995.html(推荐)
  2. https://www.zhihu.com/question/295925198

releaseShared方法

以共享方式释放资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 唤醒后继节点
return true;
}
return false;
}

整个获取/释放资源的过程是通过传播完成的,如最开始有10个资源,线程A、B、C分别需要5、4、3个资源。

  1. A线程获取到5个资源,其发现资源还剩余5个,则唤醒B线程;
  2. B线程获取到4个资源,其发现资源还剩余1个,唤醒C线程;
  3. C线程尝试取3个资源,但发现只有1个资源,继续阻塞;
  4. A线程释放1个资源,其发现资源还剩余2个,故唤醒C线程;
  5. C线程尝试取3个资源,但发现只有2个资源,继续阻塞;
  6. B线程释放2个资源,其发现资源还剩余4个,唤醒C线程;
  7. C线程获取3个资源,其发现资源还剩1个,继续唤醒后继线程…

Condition

Condition是一个接口,AQS中实现了Condition接口

1
2
public interface Condition {
public class ConditionObject implements Condition, java.io.Serializable {

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition关联的锁。

Condition对象由Lock对象(调用Lock的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。

使用案例

可参考:LinkedBlockingQueue源码

关键成员变量

1
2
3
4
5
6
7
8
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;

从上面可以看出条件队列也是一个Node节点组成的双向链表

await方法

调用await方法的线程肯定是持有锁的线程,即同步队列的头节点(因为调用Condition之前,必须获取到资源)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final void await() throws InterruptedException {
// 如果当前线程被中断,抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程添加到Condition条件队列
Node node = addConditionWaiter();
// 完全释放当前线程持有的锁,并返回锁之前持有的状态值
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 如果当前线程不在同步队列中,则挂起该线程
LockSupport.park(this);
// 被唤醒的时机有2个:
// 1. 当前线程被中断
// 2. 当前线程被signal

// 当前线程被唤醒,检查该线程是否在挂起期间被中断过,若被中断过,跳出while循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤醒后,将进入AQS的CLH同步队列,重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE) // THROW_IE:说明在被signal之前中断
throw new InterruptedException(); // 清除已取消的节点
else if (interruptMode == REINTERRUPT) // REINTERRUPT:说明在被signal之后中断
selfInterrupt();
}

addConditionWaiter方法

将当前线程添加进Condition条件队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter; // 条件队列尾节点
// If lastWaiter is cancelled, clean out.
// 如果尾节点线程已取消,则执行unlinkCancelledWaiters清除已取消的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 将当前线程封装为Node节点,状态为Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node; // 如果尾节点为null,证明条件队列为空,则将头节点指针指向当前节点
else
t.nextWaiter = node; // 当前节点作为尾节点的后继节点
lastWaiter = node;
return node;
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null; // 辅助节点
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

fullyRelease方法

完全释放当前线程持有的锁,并返回锁之前持有的状态值

考虑一下这里的 savedState。如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();// 获取当前线程持有锁的状态值
if (release(savedState)) {
// 释放锁成功,返回锁之前持有的状态值
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 若最终失败,将当前节点的状态修改为Node.CANCELLED
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

如果一个线程在不持有 lock 的基础上,就去调用 condition1.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED,这个已经入队的节点之后会被后继的节点清除。

checkInterruptWhileWaiting方法

检查线程是否在挂起期间被中断过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 检查线程挂起期间的中断状态:
// 1.若在被signal之前中断,则返回THROW_IE -1
// 2.若在被signal之后中断,则返回REINTERRUPT 1
// 3.若没有被中断,则返回0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 如果线程在被signal之前cancell,则返回true
final boolean transferAfterCancelledWait(Node node) {
// 在被signal后节点的等待状态会被改为0,所以如果cas失败,则说明还没被signal
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

signal方法

唤醒在条件等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移动到AQS的CLH同步队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 如果当前线程不是持有该condition的锁,抛IllegalMonitorStateException异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

doSignal方法

将头节点转移到AQS的CLH同步队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
// 头节点指针往后移动一个节点
// 若当前头结点的后继节点为null,则尾节点置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 将之前头节点的后继节点置为null,便于GC
first.nextWaiter = null;
// 头节点未成功转移到AQS的CLH同步队列且头节点不为null,继续自旋
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 用于将条件队列节点转移到AQS的CLH同步队列
final boolean transferForSignal(Node node) {
// 如果CAS失败,说明节点在signal之前被取消了,返回false
// 设置入队节点的等待状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 设置成功则AQS的CLH同步队列尾部.
Node p = enq(node);
// 获取入队节点的前继节点的状态
int ws = p.waitStatus;
// 1. 如果前继节点取消,则直接唤醒当前节点线程
// 2. 如果为非取消节点则将前继节点设置为SIGNAL(等前驱节点唤醒自己)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

线程被唤醒后,则会进入checkInterruptWhileWaiting方法

signalAll方法

唤醒在条件等待队列中所有的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
// 如果当前线程不是持有该condition的锁,抛IllegalMonitorStateException异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

doSignalAll方法

将所有节点转移到AQS的CLH同步队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
// 将节点转移到AQS的CLH同步队列
transferForSignal(first);
first = next;
} while (first != null);
}

await方法-指定超时时间

和await方法很类似,主要是挂起线程改成挂起线程加上超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

AbstractQueuedLongSynchronizer

AbstractQueuedLongSynchronizer具有与AbstractQueuedSynchronizer完全相同的结构,属性和方法,但所有与状态相关的参数和结果都定义为long而不是int 。 在创建同步器(如多级锁和需要64位状态的障碍)时,此类可能很有用。

坚持原创技术分享,您的支持将鼓励我继续创作!