Semaphore是基于AQS共享锁来实现,默认采用非公平锁。Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。
Semaphore用于限制可以访问某些资源(物理或逻辑的)的线程数目,其维护了一个许可证集合,有多少资源限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。
可以在构造方法中指定是公平锁还是非公平锁
使用案例
下面使用Semaphore进行限流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @org.junit.jupiter.api.Test void testSemaphore() throws InterruptedException { int nCpu = 8, queueCapacity = nCpu + 1; ThreadPoolExecutor executorService = new ThreadPoolExecutor(nCpu, nCpu, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueCapacity), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
Semaphore semaphore = new Semaphore(4);
for (int i = 0; i < nCpu; i++) { executorService.execute(() -> { try { System.out.println("线程" + Thread.currentThread().getName() + "尝试获取令牌"); semaphore.acquire(); System.out.println("线程" + Thread.currentThread().getName() + "成功获取令牌,开始执行业务代码"); TimeUnit.SECONDS.sleep(2); System.out.println("线程" + Thread.currentThread().getName() + "执行业务代码完毕,随后释放令牌"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }); } TimeUnit.SECONDS.sleep(1); executorService.shutdown(); while (Boolean.TRUE) { if (executorService.isTerminated()) { executorService.shutdown(); break; } TimeUnit.SECONDS.sleep(1); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| 线程pool-1-thread-1尝试获取令牌 线程pool-1-thread-2尝试获取令牌 线程pool-1-thread-2成功获取令牌,开始执行业务代码 线程pool-1-thread-1成功获取令牌,开始执行业务代码 线程pool-1-thread-3尝试获取令牌 线程pool-1-thread-5尝试获取令牌 线程pool-1-thread-5成功获取令牌,开始执行业务代码 线程pool-1-thread-3成功获取令牌,开始执行业务代码 线程pool-1-thread-4尝试获取令牌 线程pool-1-thread-6尝试获取令牌 线程pool-1-thread-7尝试获取令牌 线程pool-1-thread-8尝试获取令牌 线程pool-1-thread-2执行业务代码完毕,随后释放令牌 线程pool-1-thread-3执行业务代码完毕,随后释放令牌 线程pool-1-thread-1执行业务代码完毕,随后释放令牌 线程pool-1-thread-5执行业务代码完毕,随后释放令牌 线程pool-1-thread-7成功获取令牌,开始执行业务代码 线程pool-1-thread-6成功获取令牌,开始执行业务代码 线程pool-1-thread-4成功获取令牌,开始执行业务代码 线程pool-1-thread-8成功获取令牌,开始执行业务代码 线程pool-1-thread-7执行业务代码完毕,随后释放令牌 线程pool-1-thread-6执行业务代码完毕,随后释放令牌 线程pool-1-thread-8执行业务代码完毕,随后释放令牌 线程pool-1-thread-4执行业务代码完毕,随后释放令牌
|
构造方法
1 2 3 4 5 6
| public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
|
非公平锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) { super(permits); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
|
公平锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) { super(permits); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
|
可以发现,FairSync的tryAcquireShared方法与Sync的nonfairTryAcquireShared方法相比,只是多了以下代码:
1 2
| if (hasQueuedPredecessors()) return -1;
|