0%

深入理解ReentrantReadWriteLock

ReentrantReadWriteLock是读写锁的实现,写锁基于AQS的独占模式,读锁基于AQS的共享模式,读锁和写锁默认采用非公平锁,是一种可重入锁。

可以在构造方法中指定是公平锁还是非公平锁

ReentrantLock是排他锁,排他锁在同一时刻仅有一个线程可以进行访问,实际上独占锁是一种相对比较保守的锁策略,独占锁模式下的读/读、读/写、写/写操作都不能同时发生,这在一定程度上降低了吞吐量。然而读操作之间不存在数据竞争问题,如果读/读操作能够以共享锁的方式进行,那会进一步提升性能。

为了解决读写冲突问题,Doug Lea设计了ReadWriteLock接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock(); // 获取读锁

/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock(); // 获取写锁
}

共享资源的读/写操作分开进行管理,类似于数据库中的S锁(共享锁)和X锁(独占锁),其遵循如下原则:

  • 共享资源只允许加一种锁,或读锁,或写锁,不能同时加;
  • 共享资源可以被多个线程同时加读锁,而写锁只允许加一把;
  • 当共享资源被读锁占用时,写线程只能等待;当共享资源被写锁占用时,读线程只能等待。

不同线程读/写、写/写是互斥的,而读/读是互不影响的,大大提升了读操作的效率。

构造方法

注意:读写锁都是使用同一个sync对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ReentrantReadWriteLock() {
this(false); // 默认非公平锁
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
// 读写锁都是使用同一个sync对象
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

关键成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock; // 读锁
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock; // 写锁
/** Performs all synchronization mechanics */
final Sync sync; // 公平锁或非公平锁
abstract static class Sync extends AbstractQueuedSynchronizer {
// 下面这块说的就是将 state 一分为二,高 16 位用于共享模式,低16位用于独占模式
static final int SHARED_SHIFT = 16;
// 每次增加读锁同步状态,就相当于增加SHARED_UNIT
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 读锁或写锁的最大请求数量(包含重入)
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 低16位的MASK,用来计算写锁的同步状态
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
// 取 c 的高 16 位值,代表读锁的获取次数(包括重入)
static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 返回共享锁数量
/** Returns the number of exclusive holds represented in count */
// 取 c 的低 16 位值,代表写锁的重入次数,因为写锁是独占模式
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

// 这个嵌套类的实例用来记录每个线程持有的读锁数量(即重入次数)
static final class HoldCounter {
// 持有的读锁数
int count = 0;
// 线程 id
final long tid = getThreadId(Thread.currentThread());
}
// ThreadLocal 的子类
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

// 组合使用上面两个类,用 ThreadLocal 来记录当前线程持有的读锁数量
private transient ThreadLocalHoldCounter readHolds;

// 用于缓存,记录"最后一个获取读锁的线程"的读锁重入次数,
// 所以不管哪个线程获取到读锁后,就把这个值占为已用,这样就不用到 ThreadLocal 中查询 map 了
// 算不上理论的依据:通常读锁的获取很快就会伴随着释放,
// 显然,在 获取->释放 读锁这段时间,如果没有其他线程获取读锁的话,此缓存就能帮助提高性能
private transient HoldCounter cachedHoldCounter;

// 第一个获取读锁的线程(并且其未释放读锁),以及它持有的读锁数量
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

Sync() {
// 初始化 readHolds 这个 ThreadLocal 属性
readHolds = new ThreadLocalHoldCounter();
// 为了保证 readHolds 的内存可见性
setState(getState()); // ensures visibility of readHolds
}

ReadLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* The lock returned by method {@link ReentrantReadWriteLock#readLock}.
*/
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;

protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 加锁,基于AQS的共享模式获取资源,中断先记录中断,最后再selfInterrupt()
public void lock() {
sync.acquireShared(1);
}
// acquireShared是父类AQS中的方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 加锁,基于AQS的共享模式获取资源,中断则抛出异常(会响应中断)
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 尝试加锁
public boolean tryLock() {
return sync.tryReadLock();
}
// 释放锁
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void unlock() {
sync.releaseShared(1);
}
}

lock方法

加读锁,也是共享锁

1
2
3
4
5
6
7
8
9
10
11
12
13
// 加锁,基于AQS的共享模式获取资源,中断先记录中断,最后再selfInterrupt()
public void lock() {
sync.acquireShared(1);
}
// acquireShared是父类AQS中的方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 加锁,基于AQS的共享模式获取资源,中断则抛出异常(会响应中断)
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

tryAcquireShared方法

尝试以共享模式获取资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// tryAcquireShared是Sync中的方法
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
// 如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败(如果持有写锁的是当前线程,是可以继续获取读锁的)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 总的读锁数量
int r = sharedCount(c);
/*
* readerShouldBlock():读锁是否需要等待(公平锁原则)
* r < MAX_COUNT:总的读锁数量小于最大数(65535),防止溢出
* compareAndSetState(c, c + SHARED_UNIT):设置总的读锁数量+1
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 总的读锁数量+1

// 到这说明成功获取读锁
if (r == 0) {
// r == 0 表示第一个读锁线程,第一个获取读锁firstRead是不会加入到readHolds中
// 记录 firstReader 为当前线程,及其持有的读锁数量:1
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 当前线程为第一个读线程,表示第一个读锁线程重入
// 当前线程持有的读锁数量+1
firstReaderHoldCount++;
} else { // 总的读锁数量不为0并且不为当前线程
// 上面说了 cachedHoldCounter 用于缓存最后一个获取读锁的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// 如果cachedHoldCounter 缓存的不是当前线程,则设置为缓存当前线程的 HoldCounter
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// cachedHoldCounter是当前线程,但当前线程持有读锁数量为0,则加入到readHolds中
readHolds.set(rh);
// 当前线程持有的读锁数量+1
rh.count++;
}
// return 大于 0 的数,代表成功获取到了共享锁,即读锁
return 1;
}
// 到这说明获取读锁需要等待
return fullTryAcquireShared(current);
}

readerShouldBlock方法

判断读锁是否需要等待

非公平锁实现

需要判断阻塞队列中 head 的第一个后继节点是否是来获取写锁的,如果是的话让写锁先来,避免写锁饥饿

1
2
3
4
5
6
7
8
9
10
11
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
// 判断阻塞队列中 head 的第一个后继节点是否是来获取写锁的,如果是的话让写锁先来,避免写锁饥饿
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

注意:这里判断了head 的第一个后继节点是否是来获取写锁,对写锁进行了额外的照顾。

公平锁实现

判断阻塞队列中有其他元素在等待锁

1
2
3
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}

fullTryAcquireShared方法

全力获取共享锁。只有当readerShouldBlock() 返回 true,或者compareAndSetState(c, c + SHARED_UNIT) CAS 失败,存在竞争,也可能是读锁数量到达上限。

  1. readerShouldBlock() 返回 true
    1. 非公平模式下,head 的第一个后继节点是否是来获取写锁的
    2. 公平模式下,阻塞队列中有其他元素在排队
  2. CAS 失败
  3. 读锁数量到达上限
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
// 如果其他线程持有了写锁,直接到阻塞队列排队
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1; // 返回负数代表获取共享锁失败,获取失败要到阻塞队列排队
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) { // 判断读锁是否需要等待,但也可能是重入
// 下面这部分代码是处理重入的
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { // firstReader是第一个获取读锁的线程,主要是缓存作用,避免频繁查询readHolds
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter; // cachedHoldCounter是最后一个获取读锁的线程,主要是缓存作用,避免频繁查询readHolds
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get(); // 从查询获取HoldCounter
// 如果发现 count == 0,也就是说,纯属上一行代码初始化的,那么remove掉,然后到阻塞队列排队
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1; // 返回负数代表获取共享锁失败,获取失败要到阻塞队列排队
}
}

if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS给读锁数量+1,尝试获取读锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
// 如果当前读锁的总数量为0,则将当前线程设置为第一个获取读锁的线程,设置当前持有的读锁为1
// 值得注意的是,如果设置了firstReader,就不会放入readHolds中
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 如果第一个获取读锁的线程就是当前线程
firstReaderHoldCount++; // 设置当前持有的读锁为1
} else {
// 下面这也是先查询缓存
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++; // 当前线程持有的读锁数量+1
// 设置最后一个获取读锁的线程为当前线程
cachedHoldCounter = rh; // cache for release
}
return 1; // 返回1表示获取成功
}
}
}

注意:如果使用firstReader保存了当前线程,就不会再放入readHolds,而cachedHoldCounter保存了当前线程后,还会放入readHolds

unlock方法

解锁

1
2
3
4
5
6
7
8
9
10
11
public void unlock() {
sync.releaseShared(1);
}
// 父类AQS中的方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

tryReleaseShared方法

以共享模式释放资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// tryReleaseShared是Sync中的方法
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) { // 如果当前线程是第一个获取读锁的线程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) // 如果当前线程获取的读锁数量是1,则将firstReader置为空,后续成功解锁之后就不再拥有锁了
firstReader = null;
else // 如果当前线程获取的读锁数量不为1,则将读锁数量-1
firstReaderHoldCount--;
} else {
// 下面主要是查询缓存,并更新缓存中当前线程持有的读锁数量(读锁数量-1)
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// CAS操作总读锁数量-1
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 如果 nextc == 0,那就是 state 全部 32 位都为 0,也就是读锁和写锁都空了
// 此时这里返回 true 的话,其实是帮助唤醒后继节点中的获取写锁的线程
return nextc == 0;
}
}

tryLock方法

尝试加锁,立即返回是否加锁成功,虽然也有自旋过程,但不会进入AQS中的同步队列

1
2
3
public boolean tryLock() {
return sync.tryReadLock();
}

tryReadLock方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// tryReadLock是Sync中的方法
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
// 如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败(如果持有写锁的是当前线程,是可以继续获取读锁的)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS操作将总的读锁数量+1,下面的代码和tryAcquireShared一样
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}

WriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* The lock returned by method {@link ReentrantReadWriteLock#writeLock}.
*/
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;

protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 加锁,基于AQS的独占模式获取资源,中断先记录中断,最后再selfInterrupt()
public void lock() {
sync.acquire(1);
}
// acquire是父类AQS中的方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 加锁,基于AQS的共享模式获取资源,中断则抛出异常(会响应中断)
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 尝试加锁
public boolean tryLock( ) {
return sync.tryWriteLock();
}

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// 释放锁
public void unlock() {
sync.release(1);
}
// 判断当前线程是否为持有独占锁的线程
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
// 获取当前线程持有写锁的数量
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}

lock方法

加写锁,也是独占锁

1
2
3
4
5
6
7
8
9
10
// 加锁,基于AQS的独占模式获取资源,中断先记录中断,最后再selfInterrupt()
public void lock() {
sync.acquire(1);
}
// acquire是父类AQS中的方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire方法

尝试以独占模式获取资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 获取独占锁的重入数
// 当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
// 1.如果没有线程持有写锁,但又线程持有读锁,那获取写锁失败,需要等读锁释放
// 2.如果有线程持有写锁,但持有写锁的线程不是自己,需要等写锁释放
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 设置独占锁重入次数+1,因为这里acquires=1
setState(c + acquires);
return true;
}
// 判断写锁是否应该阻塞
// CAS设置独占锁重入次数+1
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 到这里说明CAS成功,将设置持有写锁的线程为当前线程
setExclusiveOwnerThread(current);
return true; // 返回获取成功
}

writerShouldBlock方法

非公平锁实现

非公平锁不需要判断是否有人排队之类的,这里返回不需要阻塞,直接CAS尝试获取独占锁

1
2
3
final boolean writerShouldBlock() {
return false; // writers can always barge
}

公平锁实现

判断阻塞队列中有其他元素在等待锁(任意锁,因为读锁也可以阻塞写锁)

1
2
3
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}

unlock方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void unlock() {
sync.release(1);
}
// 父类AQS中的方法,释放成功锁后,如果当前节点的waitStatus != 0,则需要唤醒后继节点,忘了的话复习下AQS吧
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/
// tryRelease是Sync中的方法
// 因为写锁是独占锁,具有排他性,直接state减1就是了
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // state-1
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 如果写锁持有的数量为0,则将当前持有写锁的线程设置为null
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

锁降级

Doug Lea 将持有写锁的线程,去获取读锁的过程称为锁降级(Lock downgrading)。这样,此线程就既持有写锁又持有读锁。

但是,锁升级是不可以的。线程持有读锁的话,在没释放的情况下不能去获取写锁,因为会发生死锁

因为获取写锁的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
// 1.如果没有线程持有写锁,但又线程持有读锁,那获取写锁失败,需要等读锁释放
// 2.如果有线程持有写锁,但持有写锁的线程不是自己,需要等写锁释放
return false;
...
}
...
}

假设线程 A 先获取了读锁,然后获取写锁,那么上面的方法会返回false,线程A就到阻塞队列休眠了,自己把自己弄休眠了,而且可能之后就没人去唤醒它了。

坚持原创技术分享,您的支持将鼓励我继续创作!