/** The lock for guarding barrier entry */ privatefinal ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ privatefinal Condition trip = lock.newCondition(); /** The number of parties */ privatefinalint parties; // 表示需要拦截的线程数 /* The command to run when tripped */ privatefinal Runnable barrierCommand; // 设置了这个,代表越过栅栏之前要执行相应的操作(一代只执行一次) /** The current generation */ private Generation generation = new Generation(); // 当前所处的代或周期
/** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ // 还没有到栅栏的线程数,这个值初始为 parties,然后递减 privateint count; privatestaticclassGeneration{ boolean broken = false; // 是否被打破 }
publicintawait()throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { thrownew Error(toe); // cannot happen } } /** * Main barrier code, covering the various policies. */ privateintdowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); // 获取独占锁 try { final Generation g = generation; // 若栅栏已被打破,抛出BrokenBarrierException异常 if (g.broken) thrownew BrokenBarrierException(); // 只要有1个线程被中断,则打破栅栏 if (Thread.interrupted()) { breakBarrier(); thrownew InterruptedException(); } // 对count执行减1操作,因为已经获取了独占锁,所以不用进行cas操作 int index = --count; // 最后一个到达栅栏的线程,才会执行下述代码 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 若barrierAction不为null,则执行barrierAction if (command != null) command.run(); ranAction = true; // 创建下一代栅栏 nextGeneration(); return0; } finally { if (!ranAction) // 如果执行barrierAction发送异常,则打破栅栏 breakBarrier(); } }
// loop until tripped, broken, interrupted, or timed out // 只要不是最后一个线程,就执行自旋,直到栅栏被打破、线程被中断或等待超时 for (;;) { try { if (!timed) trip.await(); // 不带超时机制调用condition的await方法 elseif (nanos > 0L) nanos = trip.awaitNanos(nanos); // 带超时机制调用awaitNanos方法 } catch (InterruptedException ie) { // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断 if (g == generation && ! g.broken) { breakBarrier(); //打破栅栏 throw ie; // 重新抛出InterruptedException异常给外层调用的方法 } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 唤醒后,检查栅栏是否已被打破 if (g.broken) thrownew BrokenBarrierException();
/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ privatevoidnextGeneration(){ // signal completion of last generation trip.signalAll(); // 将阻塞在trip上的线程依次唤醒 // set up next generation count = parties; // 更新 count 的值 generation = new Generation(); // 创建下一代栅栏 }
breakBarrier方法
打破栅栏
1 2 3 4 5 6 7 8 9
/** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ privatevoidbreakBarrier(){ generation.broken = true; // 设置状态 broken 为 true count = parties; trip.signalAll(); // 将阻塞在trip上的线程依次唤醒 }
reset方法
重置栅栏
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/** * Resets the barrier to its initial state. If any parties are * currently waiting at the barrier, they will return with a * {@link BrokenBarrierException}. Note that resets <em>after</em> * a breakage has occurred for other reasons can be complicated to * carry out; threads need to re-synchronize in some other way, * and choose one to perform the reset. It may be preferable to * instead create a new barrier for subsequent use. */ publicvoidreset(){ final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }