0%

深入理解StampedLock

在 JDK 8 中新增了 StampedLock,StampedLock 可以理解为对 ReentrantReadWriteLock 的增强,在原先读写锁的基础上新增了一种叫乐观读(Optimistic Reading)的模式。该模式并不会加锁,所以不会阻塞线程,会有更高的吞吐量和更高的性能。

基本原理

和 AQS 类似,StampedLock 同样维护了一个volatile语义的共享资源变量state和一个FIFO线程等待队列

如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制就是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中

几种锁的并发粒度对比:

锁的类型 并发粒度
ReentrantReadWriteLock 读与读互斥,读与写互斥,写与写互斥
ReentrantLock 读与读不互斥,读与写互斥,写与写互斥
StampedLock 读与读不互斥,读与写不互斥,写与写互斥

StampedLock 和 MySQL 的MVCC机制、CopyOnWriteArrayList 有些类似。

使用示例

使用 StampedLock 实现一个线程安全的集合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class Demo {

private static class MyList<T> {
private final List<T> list = new ArrayList<>();

private final StampedLock lock = new StampedLock();

public MyList() {
}

public int size() {
long stamp = lock.tryOptimisticRead();
int size = list.size();
if (!lock.validate(stamp)) {
try {
stamp = lock.readLock();
size = list.size();
} finally {
lock.unlock(stamp);
}
}
return size;
}

public void add(T o) {
long stamp = lock.writeLock();
try {
list.add(o);
} finally {
lock.unlock(stamp);
}
}
}

public static void main(String[] args) throws InterruptedException {
int nCore = Runtime.getRuntime().availableProcessors() * 2;
int nQueue = 100;
ExecutorService executorService = new ThreadPoolExecutor(nCore, nCore,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(nQueue),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
MyList<Integer> myList = new MyList<>();
final CountDownLatch cdMaster = new CountDownLatch(1);
final CountDownLatch cdWorker = new CountDownLatch(nCore);

for (int i = 0; i < nCore; i++) {
executorService.execute(() -> {
try {
cdMaster.await();
myList.add(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
cdWorker.countDown();
}
});
}
cdMaster.countDown();
cdWorker.await();
System.out.println("nCore: " + nCore);
System.out.println("size: " + myList.size());
executorService.shutdown();
}
}

注意 size() 方法中:

  1. 读取数据前先获取版本号
  2. 读取数据:将数据拷贝到线程的栈内存中
  3. 读之后将读之前的版本和读之后的版本进行对比,相同则说明读取期间没有其他线程修改过数据

关键成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class StampedLock implements java.io.Serializable {
// 核心数,用于控制自旋
private static final int NCPU = Runtime.getRuntime().availableProcessors();
// 线程入队前的最大重试次数
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
// 队列头结点自旋获取锁最大重试次数,之后再次进入队列
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
// 重新阻塞前的最大重试次数
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
// 等待溢出自旋锁时的屈服期
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
// 溢出之前用于阅读器计数的位数
private static final int LG_READERS = 7;

// 一些常量,用于state和stamp相关操作
private static final long RUNIT = 1L;
// 1000 0000,第8位表示写锁,每获取一次写锁,则加 1000 0000
private static final long WBIT = 1L << LG_READERS;
// 0111 1111,最低的7位表示读锁,每获取一次读锁,则加 1
private static final long RBITS = WBIT - 1L;
// 0111 1110,读锁的数量
private static final long RFULL = RBITS - 1L;
// 1111 1111,读锁和写锁的状态合并到一起
private static final long ABITS = RBITS | WBIT;
// 1000 0000,后7位表示正在读取的线程数
private static final long SBITS = ~RBITS;

// 锁定状态的初始值
private static final long ORIGIN = WBIT << 1; // 1 0000 0000

// 取消获取方法的特殊值,调用者可以抛出InterruptedException
private static final long INTERRUPTED = 1L;

// WNode节点的status值
private static final int WAITING = -1;
private static final int CANCELLED = 1;

// WNode节点的读写模式
private static final int RMODE = 0;
private static final int WMODE = 1;

/** Wait nodes */
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // 连接所有读的节点
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}

// CLH 队列头结点
private transient volatile WNode whead;
// CLH 队列尾节点
private transient volatile WNode wtail;

// views
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;

// 共享资源变量
private transient volatile long state;
// 将state超过 RFULL=126的值放到readerOverflow字段中
private transient int readerOverflow;
}
  • state 初始值为 ORIGIN,即:1 0000 0000
  • 当 state & WBIT != 0 时,说明有线程持有写锁(WBIT=1000 0000)
  • 当 state & ABITS !=0 时,说明有线程持有读锁或写锁(ABITS=1111 1111)
  • 每获取一次写锁时,state 会加上 WBIT(WBIT=1000 0000)
  • 每获取一次读锁时,state 会加锁 1

写锁的获取与释放

writeLock方法

获取写锁,获取失败会一直阻塞,直到获得锁成功

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Exclusively acquires the lock, blocking if necessary
* until available.
*
* @return a stamp that can be used to unlock or convert mode
*/
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
  • ((((s = state) & ABITS) == 0L 判断是否有线程持有读写锁,等于0说明没有线程持有读写锁,则通过CAS尝试获取写锁,也就是将 state 会加上 WBIT(WBIT=1000 0000)
  • 如果当前有线程持有读写锁,或者通过CAS修改state的值失败,则执行 acquireWrite 方法

acquireWrite方法

先自旋尝试、加入等待队列、直到最终 Unsafe.park() 挂起线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/**
* See above for explanation.
*
* @param interruptible true if should check interrupts and if so
* return INTERRUPTED
* @param deadline if nonzero, the System.nanoTime value to timeout
* at (and return zero)
* @return next state, or INTERRUPTED
*/
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
// 判断是否有线程持有读锁或写锁
if ((m = (s = state) & ABITS) == 0L) {
// 尝试获取写锁,通过CAS修改state的状态(加上 WBIT,WBIT=1000 0000)
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns; // 成功获取写锁,则返回 ns 的值,注意这里不能直接返回 state
}
else if (spins < 0)
// m == WBIT 说明持有写锁,wtail == whead 说明 CLH 队列为空
// 如果当前有线程持有写锁,并且CLH队列为空,则设置spins=SPINS(入队前的最大重试次数)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
// 恒成立
if (LockSupport.nextSecondarySeed() >= 0)
--spins; // 自旋次数-1
}
else if ((p = wtail) == null) { // 初始化CLH队列
WNode hd = new WNode(WMODE, null); // 写锁入队列,WMODE 表示写锁,RMODE表示读写
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
// 队列不为空,写锁入队列
node = new WNode(WMODE, p);
else if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {//入队:CAS将node修改为尾节点
// 入队成功则退出循环
p.next = node;
break;
}
}

for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) { // 如果前驱节点为头结点
if (spins < 0)
spins = HEAD_SPINS; // 设置spins=SPINS(CLH队列中有效头结点的最大重试次数)
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long s, ns;
// 判断是否有线程持有读锁或写锁
if (((s = state) & ABITS) == 0L) {
// 尝试获取写锁,通过CAS修改state的状态(加上 WBIT,WBIT=1000 0000)
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
// 如果成功获取写锁,则将CLH队列头结点位置为自己
whead = node;
node.prev = null;
return ns;
}
}
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break;
}
}
else if (h != null) { // help release stale waiters
WNode c; Thread w;
// 唤醒所有获取读锁的线程
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
// 将前驱节点状态修改为等待状态
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
U.park(false, time); // emulate LockSupport.park
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}

tryWriteLock方法

获取写锁,获取成功返回状态值,获取失败则返回0

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Exclusively acquires the lock if it is immediately available.
*
* @return a stamp that can be used to unlock or convert mode,
* or zero if the lock is not available
*/
public long tryWriteLock() {
long s, next;
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : 0L);
}
  • ((((s = state) & ABITS) == 0L 判断是否有线程持有读写锁,等于0说明没有线程持有读写锁,则通过CAS尝试获取写锁,也就是将 state 会加上 WBIT(WBIT=1000 0000)
  • 如果当前有线程持有读写锁,或者通过CAS修改state的值失败,则直接返回 0

tryWriteLock方法

在指定时间内获得写锁,获取成功返回状态值,获取失败则返回0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Exclusively acquires the lock if it is available within the
* given time and the current thread has not been interrupted.
* Behavior under timeout and interruption matches that specified
* for method {@link Lock#tryLock(long,TimeUnit)}.
*
* @param time the maximum time to wait for the lock
* @param unit the time unit of the {@code time} argument
* @return a stamp that can be used to unlock or convert mode,
* or zero if the lock is not available
* @throws InterruptedException if the current thread is interrupted
* before acquiring the lock
*/
public long tryWriteLock(long time, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(time);
if (!Thread.interrupted()) {
long next, deadline;
if ((next = tryWriteLock()) != 0L)
return next;
if (nanos <= 0L)
return 0L;
// 计算deadline
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
if ((next = acquireWrite(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}

unlockWrite方法

释放写锁,如果state不匹配或者没有持有写锁,则会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* If the lock state matches the given stamp, releases the
* exclusive lock.
*
* @param stamp a stamp returned by a write-lock operation
* @throws IllegalMonitorStateException if the stamp does
* not match the current state of this lock
*/
public void unlockWrite(long stamp) {
WNode h;
// 如果state不匹配stamp 或没有持有写锁 则抛出异常
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
// 改变写锁状态(下面解释)(WBIT=1000 0000)
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
if ((h = whead) != null && h.status != 0)
release(h);
}

state = (stamp += WBIT) 这里明明是解锁,为什么是加 WBIT?

实际上有第8位表示是否持有写锁,加 WBIT 会导致仅为,第8位会变成0,并且第9位会变成1。实际上这是为了后面乐观读锁做铺垫,让每次写锁都留下痕迹,用来处理CAS中的ABA问题。

tryUnlockWrite方法

释放写锁,不会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Releases the write lock if it is held, without requiring a
* stamp value. This method may be useful for recovery after
* errors.
*
* @return {@code true} if the lock was held, else false
*/
public boolean tryUnlockWrite() {
long s; WNode h;
if (((s = state) & WBIT) != 0L) {
state = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return true;
}
return false;
}

悲观读锁的获取与释放

readLock方法

获取读锁,获取失败会一直阻塞,直到获得锁成功

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Non-exclusively acquires the lock, blocking if necessary
* until available.
*
* @return a stamp that can be used to unlock or convert mode
*/
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
// 如果CLH队列为空,没有线程持有写锁,并且读锁的数量没有溢出
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}

如果CLH队列为空,没有线程持有写锁,并且读锁的数量没有溢出,则通过CAS尝试获取读锁,也就是将 state 加上 RUNIT(RUNIT=1),并且CAS修改成功则返回状态,否则执行 acquireRead 方法。

acquireRead方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
/**
* See above for explanation.
*
* @param interruptible true if should check interrupts and if so
* return INTERRUPTED
* @param deadline if nonzero, the System.nanoTime value to timeout
* at (and return zero)
* @return next state, or INTERRUPTED
*/
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) {
WNode h;
if ((h = whead) == (p = wtail)) {// 如果CLH队列为空
for (long m, s, ns;;) {
// 如果CLH队列为空,没有线程持有写锁,并且读锁的数量没有溢出
// 则通过CAS尝试获取读锁:state 加上 RUNIT(RUNIT=1)
// 将state超过 RFULL=0111 1110=126的值放到readerOverflow字段中
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns;
else if (m >= WBIT) { // WBIT=1000 0000
if (spins > 0) {
// 自旋
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
if (spins == 0) {
WNode nh = whead, np = wtail;
// 一直获取锁失败,CLH队列不为空,则退出内循环自旋
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
spins = SPINS;
}
}
}
}
if (p == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null) // 当前节点为空,
// 构建当前节点,模式为RMODE,前驱节点为p,即尾节点
node = new WNode(RMODE, p);
// CLH队列为空 或者尾节点的模式不是RMODE
else if (h == p || p.mode != RMODE) {
// 在尾节点后面添加该节点作为尾节点,然后跳出外层循环
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
// CLH队列不为空并且尾节点是RMODE模式, CAS将该节点添加到尾节点的cowait链中
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))
// 失败处理
node.cowait = null;
else {
for (;;) {
WNode pp, c; Thread w;
// 尝试unpark头元素(whead)的cowait中的第一个元素
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
// node节点的前驱就是whead或者p已经是whead或者p的前驱为null
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
// 没有线程持有写锁,并且读锁的数量没有溢出
// 则通过CAS尝试获取读锁:state 加上 RUNIT(RUNIT=1)
// 溢出则将state超过 RFULL的值放到readerOverflow字段
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT); // 条件为读模式
}
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
//这样做的原因是被其他线程闯入夺取了锁,或者p已经被取消
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
// 出现的中断情况下取消当前节点的cancelWaiter操作
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}

for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
WNode c; Thread w;
whead = node;
node.prev = null;
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
}
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}

tryReadLock方法

尝试获取读锁,获取成功返回状态值,获取失败则返回0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Non-exclusively acquires the lock if it is immediately available.
*
* @return a stamp that can be used to unlock or convert mode,
* or zero if the lock is not available
*/
public long tryReadLock() {
for (;;) {
long s, m, next;
// 判断是否有线程持有写锁
if ((m = (s = state) & ABITS) == WBIT)
return 0L;
// 持有读锁的数量小于 RFULL,通过CAS尝试获取读锁,也就是将 state 加上 RUNIT(RUNIT=1)
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
// 将state超过 RFULL的值放到readerOverflow字段
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
}

tryReadLock方法

在指定时间内获得读锁,获取成功返回状态值,获取失败则返回0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Non-exclusively acquires the lock if it is available within the
* given time and the current thread has not been interrupted.
* Behavior under timeout and interruption matches that specified
* for method {@link Lock#tryLock(long,TimeUnit)}.
*
* @param time the maximum time to wait for the lock
* @param unit the time unit of the {@code time} argument
* @return a stamp that can be used to unlock or convert mode,
* or zero if the lock is not available
* @throws InterruptedException if the current thread is interrupted
* before acquiring the lock
*/
public long tryReadLock(long time, TimeUnit unit)
throws InterruptedException {
long s, m, next, deadline;
long nanos = unit.toNanos(time);
if (!Thread.interrupted()) {
// 判断是否有线程持有写锁
if ((m = (s = state) & ABITS) != WBIT) {
// 没有线程持有写锁,并且读锁数量小于RFULL
// 通过CAS尝试获取读锁,也就是将 state 加上 RUNIT(RUNIT=1)
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
return next;
}
// 将state超过 RFULL的值放到readerOverflow字段
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
if (nanos <= 0L)
return 0L;
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
if ((next = acquireRead(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}

unlockRead方法

释放读锁,如果state不匹配或者没有持有读锁,则会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* If the lock state matches the given stamp, releases the
* non-exclusive lock.
*
* @param stamp a stamp returned by a read-lock operation
* @throws IllegalMonitorStateException if the stamp does
* not match the current state of this lock
*/
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
// stamp和state不匹配(前7位不相等可以忽略)或 没有持有读锁或写锁,则会抛出异常
if (((s = state) & SBITS) != (stamp & SBITS) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
if (m < RFULL) {
// 并且读锁数量小于RFULL,通过CAS尝试解锁,也就是将 state 减去 RUNIT(RUNIT=1)
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
// 溢出后则将readerOverflow变量-1
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}

tryUnlockRead方法

释放读锁,不会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Releases one hold of the read lock if it is held, without
* requiring a stamp value. This method may be useful for recovery
* after errors.
*
* @return {@code true} if the read lock was held, else false
*/
public boolean tryUnlockRead() {
long s, m; WNode h;
// 如果持有读锁
while ((m = (s = state) & ABITS) != 0L && m < WBIT) {
if (m < RFULL) {
// 并且读锁数量小于RFULL,通过CAS尝试解锁,也就是将 state 减去 RUNIT(RUNIT=1)
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return true;
}
}
// 溢出后则将readerOverflow变量-1
else if (tryDecReaderOverflow(s) != 0L)
return true;
}
return false;
}

乐观读锁的获取与释放

tryOptimisticRead

尝试获取乐观读锁

1
2
3
4
5
6
7
8
9
10
/**
* Returns a stamp that can later be validated, or zero
* if exclusively locked.
*
* @return a stamp, or zero if exclusively locked
*/
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
  • 如果当前有线程持有写锁,则返回0
  • 否则记录写锁的状态 (s & SBITS,SBITS=1000 0000) 并返回

validate方法

校验乐观锁获取之后是否有过写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Returns true if the lock has not been exclusively acquired
* since issuance of the given stamp. Always returns false if the
* stamp is zero. Always returns true if the stamp represents a
* currently held lock. Invoking this method with a value not
* obtained from {@link #tryOptimisticRead} or a locking method
* for this lock has no defined effect or result.
*
* @param stamp a stamp
* @return {@code true} if the lock has not been exclusively acquired
* since issuance of the given stamp; else false
*/
public boolean validate(long stamp) {
// 防止重排序问题
U.loadFence();
return (stamp & SBITS) == (state & SBITS);// 比较是否有过写操作
}

这里说一下为什么需要使用内存屏障,使用示例中有如下代码:

1
2
3
4
5
6
7
8
public int size() {
long stamp = lock.tryOptimisticRead();
int size = list.size();
if (!lock.validate(stamp)) {
// ...
}
// ...
}

保证 lock.tryOptimisticRead() 和 lock.validate(stamp) 中间的数据(即:int size = list.size()) load完成。

模式转换

StamedLock还支持这三种锁在一定条件下进行相互转换:

  • tryConvertToWriteLock:转换为写锁
  • tryConvertToReadLock:转换位悲观读锁
  • tryConvertToOptimisticRead:转换为乐观读锁

悲观读占满CPU的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class StampedLockTest {

public static void main(String[] args) throws InterruptedException {
final StampedLock lock = new StampedLock();
Thread t1 = new Thread(() -> {
// 获取写锁
lock.writeLock();
// 模拟程序阻塞等待其他资源
LockSupport.park();
});
t1.start();
// 保证t1获取写锁
Thread.sleep(100);
Thread t2 = new Thread(() -> {
// 阻塞在悲观读锁
lock.readLock();
});
t2.start();
// 保证t2阻塞在读锁
Thread.sleep(100);
// 中断线程t2,会导致线程t2所在CPU飙升
t2.interrupt();
t2.join();
}
}

如果没有中断,那么阻塞在readLock()上的线程在经过几次自旋后,会进入park()等待,一旦进入park()等待,就不会占用CPU了。但是park()这个函数有一个特点,就是一旦线程被中断,park()就会立即返回,返回还不算,它也不给你抛点异常啥的,那这就尴尬了。本来呢,你是想在锁准备好的时候,unpark()的线程的,但是现在锁没好,你直接中断了,park()也返回了,但是,毕竟锁没好,所以就又去自旋了。

转着转着,又转到了park()函数,但悲催的是,线程的中断标记一直打开着,park()就阻塞不住了,于是乎,下一个自旋又开始了,没完没了的自旋停不下来了,所以CPU就爆满了。

要解决这个问题,本质上需要在StampedLock内部,在park()返回时,需要判断中断标记为,并作出正确的处理,比如,退出,抛异常,或者把中断位给清理一下,都可以解决问题。

但很不幸,至少在JDK8里,还没有这样的处理。因此就出现了上面的,中断readLock()后,CPU爆满的问题。请大家一定要注意。

总结

  1. StampedLock 适用于多读场景,读写不互斥
  2. StampedLock 不支持锁重入
  3. StampedLock 没有通知等待机制
坚持原创技术分享,您的支持将鼓励我继续创作!