LockSupport 是 JDK 中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。
前置知识 yield 对静态方法 Thread.yield() 的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。
1 public static native void yield () ;
中断 一个线程执行完毕之后会自动结束,如果在运行过程中发生异常也会提前结束。
InterruptedException 通过调用一个线程的 interrupt() 来中断该线程,如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提前结束该线程。但是不能中断 I/O 阻塞和 synchronized 锁阻塞。
通常一个方法声明了 InterruptedException
,说明可以调用线程的 interrupt() 来停止该方法。例如:
1 2 public static native void sleep (long millis) throws InterruptedException ;
interrupted() 如果一个线程的 run() 方法执行一个无限循环,并且没有执行 sleep() 等会抛出 InterruptedException 的操作,那么调用线程的 interrupt() 方法就无法使线程提前结束。
但是调用 interrupt() 方法会设置线程的中断标记,此时调用 interrupted() 方法会返回 true。因此可以在循环体中使用 interrupted() 方法来判断线程是否处于中断状态,从而提前结束线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Demo1 { public static void main (String[] args) throws InterruptedException { Thread thread = new Thread(() -> { Long i = 0L ; while (!Thread.interrupted()) { i++; } System.out.println("Thread interrupt, i = " + i); }); thread.start(); Thread.sleep(100 ); thread.interrupt(); } }
输出:Thread interrupt, i = 17679919
interrupted() 与 isInterrupted() interrupted()是静态方法,isInterrupted()是普通方法。
interrupted 静态方法会把原本线程的中断状态清除,而 isInterrupted 默认不会清除中断标记。
1 2 3 4 5 6 7 public static boolean interrupted () { return currentThread().isInterrupted(true ); } public boolean isInterrupted () { return isInterrupted(false ); } private native boolean isInterrupted (boolean ClearInterrupted) ;
Lost Wake-Up 线程 A 进入阻塞状态,接下来没有其他线程去唤醒线程 A,或者其他线程唤醒时机不对(早于线程 A 的 wait()
),导致线程 A 永远阻塞下去。
Java 的语法中规定:wait() notify() 方法必须放在同步块中,就是为了防止 Lost Wake-Up 。
以阻塞队列举例:如果队列为空,则消费者会被阻塞,如果生产者生产数据时发现之前队列为空,会唤醒所有正在等待的消费者(因为队列为空被阻塞)。
await() signalAll() 都方法必须放在 lock() 方法和 unlock() 方法中进行同步。假设如果不这样,会发生:
线程 A 尝试消费阻塞队列,此时阻塞队列为空,在执行 await() 方法之前,线程 B 生产数据,发现之前队列为空,则调用 signalAll() 方法唤醒被阻塞的线程。线程 A 在 线程 B 执行 signalAll() 方法后执行 await() 方法,那么线程 A 可能会永远的阻塞下去。
Spurious wakeup 使用条件锁就可能会产生虚假唤醒现象。
以阻塞队列举例:如果队列为空,此时有十个线程尝试消费队列,发现队列为空则会调用 await() 方法阻塞,生产者调用 signalAll() 方法唤醒被阻塞的线程,十个消费线程都被唤醒了,有一个线程成功消费数据导致队列再次为空,其他九个线程发现队列为空,虽然它们都被唤醒了但无法继续消费,这种现象被称为虚假唤醒。
因为存在虚假唤醒现象,必须将 await() / wait() 方法放在循环体中,例如:
1 2 3 4 5 6 7 8 9 10 11 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
wikipedia中有关于spurious wakeups 的提到:甚至在没有发生 signalAll() 、notifyAll() 的情况下,都有可能发生虚假唤醒。
基本原理 LockSupport 方法底层都是调用 Unsafe 的方法实现。全名sun.misc.Unsafe,该类可以直接操控内存,被JDK广泛用于自己的包中,如 java.nio 和 java.util.concurrent。但是不建议在生产环境中使用这个类。因为这个API十分不安全、不轻便、而且不稳定。
LockSupport 提供 park() 和 unpark() 方法实现阻塞线程和解除线程阻塞,LockSupport 和每个使用它的线程都与一个许可(permit)关联。unpark() 可以先于 park() 之前调用。
permit是相当于1,0的开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit, 也就会将1变成0,同时park立即返回。再次调用park会变成block(因为permit为0了,会阻塞在这里,直到permit变为1), 这时调用 unpark 会把 permit 置为1。
每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累。
park/unpark模型真正解耦了线程之间的同步,线程之间不再需要一个Object或者其它变量来存储状态,不再需要关心对方的状态。
使用示例 先执行 unpark 方法再执行 park 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Demo { public static void main (String[] args) throws InterruptedException { Thread thread = new Thread(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } LockSupport.park(); }); thread.start(); LockSupport.unpark(thread); } }
源码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 public class LockSupport { private LockSupport () {} private static void setBlocker (Thread t, Object arg) { UNSAFE.putObject(t, parkBlockerOffset, arg); } public static Object getBlocker (Thread t) { if (t == null ) throw new NullPointerException(); return UNSAFE.getObjectVolatile(t, parkBlockerOffset); } public static void unpark (Thread thread) { if (thread != null ) UNSAFE.unpark(thread); } public static void park () { UNSAFE.park(false , 0L ); } public static void parkNanos (long nanos) { if (nanos > 0 ) UNSAFE.park(false , nanos); } public static void parkUntil (long deadline) { UNSAFE.park(true , deadline); } public static void park (Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false , 0L ); setBlocker(t, null ); } public static void parkNanos (Object blocker, long nanos) { if (nanos > 0 ) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false , nanos); setBlocker(t, null ); } } public static void parkUntil (Object blocker, long deadline) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(true , deadline); setBlocker(t, null ); } static final int nextSecondarySeed () { int r; Thread t = Thread.currentThread(); if ((r = UNSAFE.getInt(t, SECONDARY)) != 0 ) { r ^= r << 13 ; r ^= r >>> 17 ; r ^= r << 5 ; } else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0 ) r = 1 ; UNSAFE.putInt(t, SECONDARY, r); return r; } private static final sun.misc.Unsafe UNSAFE; private static final long parkBlockerOffset; private static final long SEED; private static final long PROBE; private static final long SECONDARY; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> tk = Thread.class ; parkBlockerOffset = UNSAFE.objectFieldOffset (tk.getDeclaredField("parkBlocker" )); SEED = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSeed" )); PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe" )); SECONDARY = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSecondarySeed" )); } catch (Exception ex) { throw new Error(ex); } } }
park 虚假唤醒 LockSupport 的 part 方法存在虚假唤醒(spurious wakeup)的情况。
由于虚假唤醒的存在,在调用 park 时一般采用自旋的方式:
1 2 3 4 while (!canProceed()) { LockSupport.park(this ); }
虚假唤醒的具体原因需要结合 c++ 代码分析。
利用LockSupport实现先进先出锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class FIFOMutex { private final AtomicBoolean locked = new AtomicBoolean(false ); private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>(); public void lock () { boolean wasInterrupted = false ; Thread current = Thread.currentThread(); waiters.add(current); while (waiters.peek() != current || !locked.compareAndSet(false , true )) { LockSupport.park(this ); if (Thread.interrupted()) wasInterrupted = true ; } waiters.remove(); if (wasInterrupted) current.interrupt(); } public void unlock () { locked.set(false ); LockSupport.unpark(waiters.peek()); } }
先进先出锁就是先申请锁的线程最先获得锁的资源,实现上采用了队列再加上LockSupport.park。
将当前调用lock的线程加入队列
如果等待队列的队首元素不是当前线程或者locked为true,则说明有线程已经持有了锁,那么调用park阻塞其余的线程。
如果队首元素是当前线程且locked为false,则说明前面已经没有人持有锁,删除队首元素也就是当前的线程,然后当前线程继续正常执行。
执行完后调用unlock方法将锁变量修改为false,并解除队首线程的阻塞状态。此时的队首元素继续之前的判断。
FIFOMutex为什么需要AtomicBoolean变量 FIFOMutex 中的 lock 方法中 调用 LockSupport.park 使用了 while 来防止虚假唤醒,那么这个 AtomicBoolean变量的作用是什么呢?其实同样还是防止虚假唤醒 。因为获取锁资源的线程会执行 waiters.remove(),所以单靠 waiters.peek() != current 这个条件是无法判断能否获取锁资源。
如果去掉该变量,则可能出现:线程A正常运行,这个时候 waiter.peek() 是线程B,线程B被虚假唤醒了,如果没有后面cas,线程B就在没有拿到锁的情况下运行了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void lock () { boolean wasInterrupted = false ; Thread current = Thread.currentThread(); waiters.add(current); while (waiters.peek() != current || !locked.compareAndSet(false , true )) { LockSupport.park(this ); if (Thread.interrupted()) { wasInterrupted = true ; } } waiters.remove(); if (wasInterrupted) { current.interrupt(); } }
小结
unpark() 可以先于 park() 之前调用。
LockSupport 不可重入
LockSupport 可中断
park() 存在虚假唤醒的情况,需结合 while 使用
park/unpark和 wait/notify区别
wait和notify方法必须和同步锁 synchronized一块儿使用。而park/unpark使用就比较灵活了,没有这个限制,可以在任何地方使用。
park/unpark 使用时没有先后顺序,都可以使线程不阻塞(前面代码已验证)。而wait必须在notify前先使用,如果先notify,再wait,则线程会一直等待。
notify只能随机释放一个线程,并不能指定某个特定线程,notifyAll是释放锁对象中的所有线程。而unpark方法可以唤醒指定的线程。
调用wait方法会使当前线程释放锁资源,但使用的前提是必须已经获得了锁。 而park不会释放锁资源。(指的是LockSupport和synchronized一起使用)