0%

简单分析Exchanger和Phaser

Exchanger 用于线程之间交换数据。

JDK7 新增了一个同步工具类 Phaser,其功能比 CyclicBarrier 和 CountDownLatch 更加强大。

Exchanger

Exchanger的核心机制和Lock一样,也是CAS+park/unpark。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Demo {

public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = Thread.currentThread().getName();
data = exchanger.exchange(data);
System.out.println(Thread.currentThread().getName() + " = " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread1").start();
new Thread(() -> {
try {
String data = Thread.currentThread().getName();
data = exchanger.exchange(data);
System.out.println(Thread.currentThread().getName() + " = " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread2").start();
new Thread(() -> {
try {
String data = Thread.currentThread().getName();
data = exchanger.exchange(data);
System.out.println(Thread.currentThread().getName() + " = " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread3").start();
new Thread(() -> {
try {
String data = Thread.currentThread().getName();
data = exchanger.exchange(data);
System.out.println(Thread.currentThread().getName() + " = " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "thread4").start();
}
}

输出效果(结果每次都可能都一样):

1
2
3
4
thread2 = thread1
thread1 = thread2
thread4 = thread3
thread3 = thread4

Phaser

Phaser没有基于AQS来实现,但具备AQS的核心特性:state变量、CAS操作、阻塞队列。

替代CountDownLatch

1个主线程要等10个worker线程完成之后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Demo3 {

public static void main(String[] args) throws InterruptedException {
int nCore = Runtime.getRuntime().availableProcessors() * 2;
int nQueue = 10;
ExecutorService executorService = new ThreadPoolExecutor(nCore, nCore,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(nQueue),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
Phaser master = new Phaser(1);
Phaser worker = new Phaser(nQueue);

for (int i = 0; i < nQueue; i++) {
executorService.execute(() -> {
master.awaitAdvance(master.getPhase());
System.out.println(Thread.currentThread().getName() + " arrive");
worker.arrive();
});
}
// 这里必须确保 master.arrive() 在 master.awaitAdvance() 之前执行,否则其他线程会一直阻塞
TimeUnit.SECONDS.sleep(1);
master.arrive();
worker.awaitAdvance(worker.getPhase());
System.out.println("main");
executorService.shutdown();
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
pool-1-thread-1 arrive
pool-1-thread-4 arrive
pool-1-thread-5 arrive
pool-1-thread-3 arrive
pool-1-thread-2 arrive
pool-1-thread-6 arrive
pool-1-thread-8 arrive
pool-1-thread-7 arrive
pool-1-thread-9 arrive
pool-1-thread-10 arrive
main

替代CyclicBarrier

4个工程师去公司应聘的例子:1.到达公司、2.笔试、3.面试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Demo2 {

public static void main(String[] args) throws InterruptedException {
int nCore = Runtime.getRuntime().availableProcessors() * 2;
int nQueue = 4;
ExecutorService executorService = new ThreadPoolExecutor(nCore, nCore,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(nQueue),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
Phaser phaser = new Phaser(nQueue);

for (int i = 0; i < nQueue; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 到达");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 笔试");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 面试");
});
}
TimeUnit.SECONDS.sleep(1);
executorService.shutdown();
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-1 到达
pool-1-thread-4 到达
pool-1-thread-3 到达
pool-1-thread-2 到达
pool-1-thread-2 笔试
pool-1-thread-3 笔试
pool-1-thread-4 笔试
pool-1-thread-1 笔试
pool-1-thread-1 面试
pool-1-thread-3 面试
pool-1-thread-2 面试
pool-1-thread-4 面试
坚持原创技术分享,您的支持将鼓励我继续创作!