Exchanger 用于线程之间交换数据。
JDK7 新增了一个同步工具类 Phaser,其功能比 CyclicBarrier 和 CountDownLatch 更加强大。
Exchanger
Exchanger的核心机制和Lock一样,也是CAS+park/unpark。
使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class Demo {
public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); new Thread(() -> { try { String data = Thread.currentThread().getName(); data = exchanger.exchange(data); System.out.println(Thread.currentThread().getName() + " = " + data); } catch (InterruptedException e) { e.printStackTrace(); } }, "thread1").start(); new Thread(() -> { try { String data = Thread.currentThread().getName(); data = exchanger.exchange(data); System.out.println(Thread.currentThread().getName() + " = " + data); } catch (InterruptedException e) { e.printStackTrace(); } }, "thread2").start(); new Thread(() -> { try { String data = Thread.currentThread().getName(); data = exchanger.exchange(data); System.out.println(Thread.currentThread().getName() + " = " + data); } catch (InterruptedException e) { e.printStackTrace(); } }, "thread3").start(); new Thread(() -> { try { String data = Thread.currentThread().getName(); data = exchanger.exchange(data); System.out.println(Thread.currentThread().getName() + " = " + data); } catch (InterruptedException e) { e.printStackTrace(); } }, "thread4").start(); } }
|
输出效果(结果每次都可能都一样):
1 2 3 4
| thread2 = thread1 thread1 = thread2 thread4 = thread3 thread3 = thread4
|
Phaser
Phaser没有基于AQS来实现,但具备AQS的核心特性:state变量、CAS操作、阻塞队列。
替代CountDownLatch
1个主线程要等10个worker线程完成之后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class Demo3 {
public static void main(String[] args) throws InterruptedException { int nCore = Runtime.getRuntime().availableProcessors() * 2; int nQueue = 10; ExecutorService executorService = new ThreadPoolExecutor(nCore, nCore, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(nQueue), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); Phaser master = new Phaser(1); Phaser worker = new Phaser(nQueue);
for (int i = 0; i < nQueue; i++) { executorService.execute(() -> { master.awaitAdvance(master.getPhase()); System.out.println(Thread.currentThread().getName() + " arrive"); worker.arrive(); }); } TimeUnit.SECONDS.sleep(1); master.arrive(); worker.awaitAdvance(worker.getPhase()); System.out.println("main"); executorService.shutdown(); } }
|
输出:
1 2 3 4 5 6 7 8 9 10 11
| pool-1-thread-1 arrive pool-1-thread-4 arrive pool-1-thread-5 arrive pool-1-thread-3 arrive pool-1-thread-2 arrive pool-1-thread-6 arrive pool-1-thread-8 arrive pool-1-thread-7 arrive pool-1-thread-9 arrive pool-1-thread-10 arrive main
|
替代CyclicBarrier
4个工程师去公司应聘的例子:1.到达公司、2.笔试、3.面试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class Demo2 {
public static void main(String[] args) throws InterruptedException { int nCore = Runtime.getRuntime().availableProcessors() * 2; int nQueue = 4; ExecutorService executorService = new ThreadPoolExecutor(nCore, nCore, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(nQueue), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); Phaser phaser = new Phaser(nQueue);
for (int i = 0; i < nQueue; i++) { executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + " 到达"); phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName() + " 笔试"); phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName() + " 面试"); }); } TimeUnit.SECONDS.sleep(1); executorService.shutdown(); } }
|
输出:
1 2 3 4 5 6 7 8 9 10 11 12
| pool-1-thread-1 到达 pool-1-thread-4 到达 pool-1-thread-3 到达 pool-1-thread-2 到达 pool-1-thread-2 笔试 pool-1-thread-3 笔试 pool-1-thread-4 笔试 pool-1-thread-1 笔试 pool-1-thread-1 面试 pool-1-thread-3 面试 pool-1-thread-2 面试 pool-1-thread-4 面试
|