0%

RedissonLock源码分析

RLock是Redisson中一种可重入的分布式锁,RLock是一个接口,同时继承了java.util.concurrent.locks.Lock接口。

RedissonLock是RLock接口的默认实现,是一种非公平的可重入的分布式锁。并且内部实现了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期,防止业务执行较长导致锁被自动释放。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

Redisson版本:3.13.3

使用案例

RLock继承了java.util.concurrent.locks.Lock接口,使用方式和Lock一样:

1
2
3
4
5
6
7
Lock l = redissonClient.getLock("name");
l.lock();
try {
// access the resource protected by this lock
} finally {
l.unlock();
}

RedissonLock源码分析

RedissonLock使用方法如下:

1
2
3
4
5
6
7
8
9
10
11
public String test1() {
RLock lock = redissonClient.getLock("good");
lock.lock();
try {
// 减库存
NUM--;
} finally {
lock.unlock();
}
return "减库存成功,NUM=" + NUM;
}

redissonClient.getLock(“good”)

getLock默认初始化了一个RedissonLock实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Returns Lock instance by name.
* <p>
* Implements a <b>non-fair</b> locking so doesn't guarantees an acquire order by threads.
* <p>
* To increase reliability during failover, all operations wait for propagation to all Redis slaves.
*
* @param name - name of object
* @return Lock object
*/
RLock getLock(String name);

@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}

RedissonLock构造方法

1
2
3
4
5
6
7
8
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

id是一个UUID,用于保证自己持有的锁不会被其他线程解锁,因为用的同一个commandExecutor对象,所以这里不会重复生成UUID

internalLockLeaseTime是锁的超时时间,默认是30s。

lock方法

加锁

leaseTime表示锁的超时时间,对应Redis中key的过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 尝试加锁,加锁失败则返回锁的ttl
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) { // ttl为空表示加锁成功
return;
}
// 订阅锁释放事件
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}

try {
while (true) {
// 尝试获取锁,加锁失败则返回锁的ttl
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) { // ttl为空表示加锁成功
break;
}

// waiting for message
if (ttl >= 0) {
try {
// 通过信号量来阻塞线程,每次阻塞时间为锁的剩余过期时间
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 取消订阅
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}

tryAcquire方法

尝试加锁,如果加锁失败则返回锁还有多少毫秒过期,加锁成功则返回空

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

protected final <V> V get(RFuture<V> future) {
return commandExecutor.get(future);
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}

// lock acquired
if (ttlRemaining == null) { // 加锁成功,则启动看门狗用于续锁
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
// 尝试加锁
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

lua脚本 加锁

这里使用了lua脚本来保证加锁操作的原子性

1
2
3
4
5
6
7
8
9
10
11
if (redis.call('exists', KEYS[1]) == 0) then 
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
  1. 如果锁不存在,则创建锁(Hash结构)。然后使用hincrby命令(如果key不存在,value则为0),相当于往这个Hash中添加:Key为”uuid:线程id“,value为”1“(key使用uuid+线程id是为了保证不被其他线程解锁,value则表示锁的重入次数)。然后设置锁Hash的过期时间,返回null。
  2. 如果锁已存在并且value存在于Hash中,说明是自己加的锁,则将锁的重入次数+1,然后设置锁的过期时间,返回null。
  3. 锁存在,但不是自己创建的,则返回锁还有多少毫秒过期。

Watch Dog 续锁

设置定时任务每隔1/3的过期时间进行续锁,解锁时也会取消续锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) { // 已经启动过,则无需启动
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}

private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}

Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}

RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}

if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}

// 判断自己是不是还持有锁,如果持有锁则重新设置过期时间,返回1,没有持有锁则返回0
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}

定时任务-netty时间轮

netty的时间轮算法的解读

  • 时间轮是一种高效利用线程资源来进行批量化调度的一种调度模型(调度任务全部都绑定到同一个的调度器)。
  • 时间轮调度器的时间精度可能不是很高,取决于时间段“指针”单元的最小粒度大小。
  • 串行执行,所有的任务是依次执行,容易出现调度超时和任务堆集的情况。
  • 无法再宕机之后恢复任务重新调度。
1
2
3
4
5
public class MasterSlaveConnectionManager implements ConnectionManager {

private HashedWheelTimer timer;
// timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);
}

subscribe方法

订阅锁释放事件,设置监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
// entryName:e23eb663-8186-4efa-8cb5-bc817533c903:good
// channelName:redisson_lock__channel:{good}
public RFuture<E> subscribe(String entryName, String channelName) {
AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
};

Runnable listener = new Runnable() {

@Override
public void run() {
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}

E value = createEntry(newPromise);
value.acquire();

E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}

RedisPubSubListener<Object> listener = createListener(channelName, value);
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
semaphore.acquire(listener);
listenerHolder.set(listener);

return newPromise;
}

@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
// 锁释放,唤醒监听器
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 释放信号量,被阻塞线程会被唤醒
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}

value.getLatch().release(value.getLatch().getQueueLength());
}
}

tryLock方法

尝试加锁,设置最大等待时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}

time -= System.currentTimeMillis() - current;
if (time <= 0) { // 等待超时了,设置获取锁失败
acquireFailed(waitTime, unit, threadId);
return false;
}

current = System.currentTimeMillis();
// 订阅锁释放事件
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 如果返回false,说明等待超时了,锁可能自动释放,但没发布锁释放事件
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId); // 取消订阅
}
});
}
// 等待超时了,设置获取锁失败
acquireFailed(waitTime, unit, threadId);
return false;
}

try {
// 重新计算锁的剩余时间
time -= System.currentTimeMillis() - current;
if (time <= 0) { // 等待超时了,设置获取锁失败
acquireFailed(waitTime, unit, threadId);
return false;
}

while (true) { //不超时的情况下,循环等待锁释放
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) { // ttl为空表示加锁成功
return true;
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0) { // 等待超时了,设置获取锁失败
acquireFailed(waitTime, unit, threadId);
return false;
}

// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 通过信号量来阻塞线程,每次阻塞时间为锁的剩余过期时间
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0) { // 等待超时了,设置获取锁失败
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 最后,取消订阅
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}

unLock方法

解锁,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);

future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId); // 取消看门狗续锁任务

if (e != null) {
result.tryFailure(e);
return;
}

if (opStatus == null) { // 返回为空,则抛出IllegalMonitorStateException异常
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause); // 设置解锁失败
return;
}

result.trySuccess(null);
});

return result;
}

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

lua脚本 解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil; // 这行貌似有点多余
  1. 如果锁不存在,或者不是自己的锁,则返回空。
  2. 如果锁的重入次数大于1,则重新设置锁的过期时间,返回0。
  3. 如果锁的重入次数等于1(不可能小于1),则删除锁,并且发布锁释放事件,返回1。

引出思考

  1. 为啥不先使用JVM的锁尝试加锁,加锁成功后再操作Redis,减少网络IO次数。
  2. 重入次数感觉没必要存入Redis,直接用ReentrantLock代替或者本地变量代替即可。
坚持原创技术分享,您的支持将鼓励我继续创作!