0%

深入理解LockSupport

LockSupport 是 JDK 中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。

前置知识

yield

对静态方法 Thread.yield() 的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。

1
public static native void yield();

中断

一个线程执行完毕之后会自动结束,如果在运行过程中发生异常也会提前结束。

InterruptedException

通过调用一个线程的 interrupt() 来中断该线程,如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提前结束该线程。但是不能中断 I/O 阻塞和 synchronized 锁阻塞。

通常一个方法声明了 InterruptedException,说明可以调用线程的 interrupt() 来停止该方法。例如:

1
2
// Thread 类的 sleep 方法
public static native void sleep(long millis) throws InterruptedException;

interrupted()

如果一个线程的 run() 方法执行一个无限循环,并且没有执行 sleep() 等会抛出 InterruptedException 的操作,那么调用线程的 interrupt() 方法就无法使线程提前结束。

但是调用 interrupt() 方法会设置线程的中断标记,此时调用 interrupted() 方法会返回 true。因此可以在循环体中使用 interrupted() 方法来判断线程是否处于中断状态,从而提前结束线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Demo1 {

public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
Long i = 0L;
while (!Thread.interrupted()) {
i++;
}
System.out.println("Thread interrupt, i = " + i);
});
thread.start();
Thread.sleep(100);
thread.interrupt();
}
}

输出:Thread interrupt, i = 17679919

interrupted() 与 isInterrupted()

interrupted()是静态方法,isInterrupted()是普通方法。

interrupted 静态方法会把原本线程的中断状态清除,而 isInterrupted 默认不会清除中断标记。

1
2
3
4
5
6
7
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
public boolean isInterrupted() {
return isInterrupted(false);
}
private native boolean isInterrupted(boolean ClearInterrupted);

Lost Wake-Up

线程 A 进入阻塞状态,接下来没有其他线程去唤醒线程 A,或者其他线程唤醒时机不对(早于线程 A 的 wait() ),导致线程 A 永远阻塞下去。

Java 的语法中规定:wait() notify() 方法必须放在同步块中,就是为了防止 Lost Wake-Up

以阻塞队列举例:如果队列为空,则消费者会被阻塞,如果生产者生产数据时发现之前队列为空,会唤醒所有正在等待的消费者(因为队列为空被阻塞)。

await() signalAll() 都方法必须放在 lock() 方法和 unlock() 方法中进行同步。假设如果不这样,会发生:

线程 A 尝试消费阻塞队列,此时阻塞队列为空,在执行 await() 方法之前,线程 B 生产数据,发现之前队列为空,则调用 signalAll() 方法唤醒被阻塞的线程。线程 A 在 线程 B 执行 signalAll() 方法后执行 await() 方法,那么线程 A 可能会永远的阻塞下去。

Spurious wakeup

使用条件锁就可能会产生虚假唤醒现象。

以阻塞队列举例:如果队列为空,此时有十个线程尝试消费队列,发现队列为空则会调用 await() 方法阻塞,生产者调用 signalAll() 方法唤醒被阻塞的线程,十个消费线程都被唤醒了,有一个线程成功消费数据导致队列再次为空,其他九个线程发现队列为空,虽然它们都被唤醒了但无法继续消费,这种现象被称为虚假唤醒。

因为存在虚假唤醒现象,必须将 await() / wait() 方法放在循环体中,例如:

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 使用while循环防止虚假唤醒
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

wikipedia中有关于spurious wakeups的提到:甚至在没有发生 signalAll() 、notifyAll() 的情况下,都有可能发生虚假唤醒。

基本原理

LockSupport 方法底层都是调用 Unsafe 的方法实现。全名sun.misc.Unsafe,该类可以直接操控内存,被JDK广泛用于自己的包中,如 java.nio 和 java.util.concurrent。但是不建议在生产环境中使用这个类。因为这个API十分不安全、不轻便、而且不稳定。

LockSupport 提供 park() 和 unpark() 方法实现阻塞线程和解除线程阻塞,LockSupport 和每个使用它的线程都与一个许可(permit)关联。unpark() 可以先于 park() 之前调用。

permit是相当于1,0的开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit, 也就会将1变成0,同时park立即返回。再次调用park会变成block(因为permit为0了,会阻塞在这里,直到permit变为1), 这时调用 unpark 会把 permit 置为1。

每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累。

park/unpark模型真正解耦了线程之间的同步,线程之间不再需要一个Object或者其它变量来存储状态,不再需要关心对方的状态。

使用示例

先执行 unpark 方法再执行 park 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Demo {

public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.park();
});
thread.start();
LockSupport.unpark(thread);
}
}

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public class LockSupport {
private LockSupport() {} // Cannot be instantiated.

// parkBlocker 线程被阻塞的时候被记录,以便监视和诊断工具来识别线程被阻塞的原因
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}

// 返回提供给最近一次尚未解除阻塞的 park 方法调用的 blocker 对象
// 如果该调用不受阻塞,则返回 null
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}

// 将许可值设置为1,同时判断之前的值是不是0,如果是则唤醒设置为0的线程
// 如果给定线程尚未启动,则无法保证此操作有任何效果
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

// 如果许可值为1,则消费该许可(将其设置为0)并且立刻返回
// 如果许可值为0,则阻塞该线程直到下面三种情况发生:
// 1. 其他某个线程将当前线程作为目标调用 unpark
// 2. 其他某个线程中断当前线程
// 3. 该调用被虚假唤醒
public static void park() {
UNSAFE.park(false, 0L);
}

// 和park方法类似,多了一种被唤醒的情况:4.经过了指定的等待时间
// nanos是指相对时间,还需等待多久
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}

// 和park方法类似,多了一种被唤醒的情况:4.指定的截止时间过去了
// deadline是指绝对时间,等待的截止时间
public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}

// 和park()方法类似,只不过增加了暂停的同步对象
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null); // 被唤醒后再将 blocker 设置为空
}

// parkNanos(long nanos)方法类似,只不过增加了暂停的同步对象
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null); // 被唤醒后再将 blocker 设置为空
}
}

// parkUntil(long deadline)方法类似,只不过增加了暂停的同步对象
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null); // 被唤醒后再将 blocker 设置为空
}

static final int nextSecondarySeed() {
int r;
Thread t = Thread.currentThread();
if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
r ^= r << 13; // xorshift
r ^= r >>> 17;
r ^= r << 5;
}
else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
r = 1; // avoid zero
UNSAFE.putInt(t, SECONDARY, r);
return r;
}

// Hotspot implementation via intrinsics API
private static final sun.misc.Unsafe UNSAFE;
private static final long parkBlockerOffset;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
parkBlockerOffset = UNSAFE.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
SEED = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSeed"));
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
SECONDARY = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception ex) { throw new Error(ex); }
}
}

park 虚假唤醒

LockSupport 的 part 方法存在虚假唤醒(spurious wakeup)的情况。

由于虚假唤醒的存在,在调用 park 时一般采用自旋的方式:

1
2
3
4
while (!canProceed()) { 
// ...
LockSupport.park(this);
}

虚假唤醒的具体原因需要结合 c++ 代码分析。

利用LockSupport实现先进先出锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class FIFOMutex {

private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue<Thread> waiters
= new ConcurrentLinkedQueue<Thread>();

public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
waiters.add(current);

// Block while not first in queue or cannot acquire lock
while (waiters.peek() != current ||
!locked.compareAndSet(false, true)) {
LockSupport.park(this);
if (Thread.interrupted()) // ignore interrupts while waiting
wasInterrupted = true;
}

waiters.remove();
if (wasInterrupted) // reassert interrupt status on exit
current.interrupt();
}

public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}

先进先出锁就是先申请锁的线程最先获得锁的资源,实现上采用了队列再加上LockSupport.park。

  1. 将当前调用lock的线程加入队列
  2. 如果等待队列的队首元素不是当前线程或者locked为true,则说明有线程已经持有了锁,那么调用park阻塞其余的线程。
  3. 如果队首元素是当前线程且locked为false,则说明前面已经没有人持有锁,删除队首元素也就是当前的线程,然后当前线程继续正常执行。
  4. 执行完后调用unlock方法将锁变量修改为false,并解除队首线程的阻塞状态。此时的队首元素继续之前的判断。

FIFOMutex为什么需要AtomicBoolean变量

FIFOMutex 中的 lock 方法中 调用 LockSupport.park 使用了 while 来防止虚假唤醒,那么这个 AtomicBoolean变量的作用是什么呢?其实同样还是防止虚假唤醒。因为获取锁资源的线程会执行 waiters.remove(),所以单靠 waiters.peek() != current 这个条件是无法判断能否获取锁资源。

如果去掉该变量,则可能出现:线程A正常运行,这个时候 waiter.peek() 是线程B,线程B被虚假唤醒了,如果没有后面cas,线程B就在没有拿到锁的情况下运行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
waiters.add(current);

// Block while not first in queue or cannot acquire lock
while (waiters.peek() != current || !locked.compareAndSet(false, true)) {
LockSupport.park(this);

// ignore interrupts while waiting
if (Thread.interrupted()) {
wasInterrupted = true;
}
}

waiters.remove();
// reassert interrupt status on exit
if (wasInterrupted) {
current.interrupt();
}
}

小结

  1. unpark() 可以先于 park() 之前调用。
  2. LockSupport 不可重入
  3. LockSupport 可中断
  4. park() 存在虚假唤醒的情况,需结合 while 使用

park/unpark和 wait/notify区别

  1. wait和notify方法必须和同步锁 synchronized一块儿使用。而park/unpark使用就比较灵活了,没有这个限制,可以在任何地方使用。
  2. park/unpark 使用时没有先后顺序,都可以使线程不阻塞(前面代码已验证)。而wait必须在notify前先使用,如果先notify,再wait,则线程会一直等待。
  3. notify只能随机释放一个线程,并不能指定某个特定线程,notifyAll是释放锁对象中的所有线程。而unpark方法可以唤醒指定的线程。
  4. 调用wait方法会使当前线程释放锁资源,但使用的前提是必须已经获得了锁。 而park不会释放锁资源。(指的是LockSupport和synchronized一起使用)
坚持原创技术分享,您的支持将鼓励我继续创作!