publicclassTimer{ /** * 优先队列,本质上是一个基于数组的小顶堆,顶堆是下一个需要执行的任务 * The timer task queue. This data structure is shared with the timer * thread. The timer produces tasks, via its various schedule calls, * and the timer thread consumes, executing timer tasks as appropriate, * and removing them from the queue when they're obsolete. */ privatefinal TaskQueue queue = new TaskQueue();
/** * 执行延迟任务的线程 * The timer thread. */ privatefinal TimerThread thread = new TimerThread(queue); /** * Creates a new timer whose associated thread has the specified name. * The associated thread does <i>not</i> * {@linkplain Thread#setDaemon run as a daemon}. * * @param name the name of the associated thread * @throws NullPointerException if {@code name} is null * @since 1.5 */ publicTimer(String name){ thread.setName(name); thread.start(); // 构造时默认启动执行线程 } }
/** * The main timer loop. (See class comment.) */ privatevoidmainLoop(){ while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); // 如果队列为空则一直等待,添加任务时则会通知 if (queue.isEmpty()) break; // Queue is empty and will forever remain; die
// Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); // 不是周期性任务则直接移除 task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule // 是周期性任务则修改当前任务时间为下次执行的时间 queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } }
/** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given core pool size. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ publicScheduledThreadPoolExecutor(int corePoolSize){ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } privateclassScheduledFutureTask<V> extendsFutureTask<V> implementsRunnableScheduledFuture<V> { // ... } }
privateclassScheduledFutureTask<V> extendsFutureTask<V> implementsRunnableScheduledFuture<V> { /** * Overrides FutureTask version so as to reset/requeue if periodic. */ publicvoidrun(){ boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); elseif (!periodic) // 如果不是周期性任务,直接运行 ScheduledFutureTask.super.run(); elseif (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); // 重设下一次执行任务时间 reExecutePeriodic(outerTask); // 重新入队 } } } // ScheduledFutureTask.super.run(); 父类FutureTask中的run方法 publicvoidrun(){ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { // 捕获异常 result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take()throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); // 如果队列为空则等待元素插入 else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); // 小于等于0则说明时间到了,直接出队 first = null; // don't retain ref while waiting if (leader != null) // leader是减少不必要的等待,没抢到当leader线程都await() available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); // 等待时间到达 } finally { if (leader == thisThread) // 执行完了重置leader leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); // 唤醒想当leader的线程 lock.unlock(); } }
/** * Creates a new timer. * * @param threadFactory a {@link ThreadFactory} that creates a * background {@link Thread} which is dedicated to * {@link TimerTask} execution. * @param tickDuration the duration between tick * @param unit the time unit of the {@code tickDuration} * @param ticksPerWheel the size of the wheel * @param leakDetection {@code true} if leak detection should be enabled always, * if false it will only be enabled if the worker thread is * not a daemon thread. * @param maxPendingTimeouts The maximum number of pending timeouts after which call to * {@code newTimeout} will result in * {@link java.util.concurrent.RejectedExecutionException} * being thrown. No maximum pending timeouts limit is assumed * if this value is 0 or negative. */ publicHashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts){
// Normalize ticksPerWheel to power of two and initialize the wheel. // 和HashMap类似,时间轮大小为大于等于 ticksPerWheel 的第一个 2 的幂,方便后续通过位运算代替模运算 wheel = createWheel(ticksPerWheel); mask = wheel.length - 1;
// Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration);
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit){ ObjectUtil.checkNotNull(task, "task"); ObjectUtil.checkNotNull(unit, "unit"); // 等待的任务数+1 pendingTimeoutsCount是AtomicLong的实例,表示等待的任务数 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); // 如果时间轮内等待的任务数大于最大值,则会抛出异常 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); thrownew RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 开启时间轮内的线程 start();
// Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. // 计算当前添加任务的执行时间 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
/** * Starts the background thread explicitly. The background thread will * start automatically on demand even if you did not call this method. * * @throws IllegalStateException if this timer has been * {@linkplain #stop() stopped} already */ publicvoidstart(){ switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: // 通过CAS保证只能由一个线程开启时间轮内的线程 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: thrownew IllegalStateException("cannot be started once stopped"); default: thrownew Error("Invalid WorkerState"); }
// Wait until the startTime is initialized by the worker. while (startTime == 0) { try { // CountDownLatch startTimeInitialized = new CountDownLatch(1); // startTimeInitialized 是一个CountDownLatch // 用于保证工作线程的 startTime 属性初始化 startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
publicvoidrun(){ // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; }
// Notify the other threads waiting for the initialization at start(). // 唤醒调用 HashedWheelTimer#start() 方法被阻塞的线程 startTimeInitialized.countDown();
// Fill the unprocessedTimeouts so we can return them from stop() method. // 运行到这说明时间轮已关闭 // 将还未执行的任务以列表的形式保存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去 // 还未执行的任务可能会在两个地方,一:时间轮数组内,二:队列中 for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } // 处理过期任务 processCancelledTasks(); }
/** * calculate goal nanoTime from startTime and current tick number, * then wait until that goal has been reached. * @return Long.MIN_VALUE if received a shutdown request, * current time otherwise (with Long.MIN_VALUE changed by +1) */ privatelongwaitForNextTick(){ // 计算时钟下次拨动的相对时间 long deadline = tickDuration * (tick + 1);
if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } }
// Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 // 这里是为了兼容 Windows 平台,因为 Windows 平台的调度最小单位为 10ms, // 如果不是 10ms 的倍数,可能会引起 sleep 时间不准确 if (PlatformDependent.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; if (sleepTimeMs == 0) { sleepTimeMs = 1; } }
privatevoidtransferTimeoutsToBuckets(){ // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. // 一次最多只处理队列中的 100000 个任务 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. // 过滤已经取消了的任务 continue; }
/** * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. */ publicvoidexpireTimeouts(long deadline){ HashedWheelTimeout timeout = head;
// process all timeouts while (timeout != null) { HashedWheelTimeout next = timeout.next; // 任务执行的圈数 > 0,表示任务还需要经过 remainingRounds 圈时钟循环才能执行 if (timeout.remainingRounds <= 0) { // 从链表中移除当前任务,并返回链表中下一个任务 next = remove(timeout); if (timeout.deadline <= deadline) { // 执行任务 timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. thrownew IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } elseif (timeout.isCancelled()) { // 移除取消的任务 next = remove(timeout); } else { // 圈数-1 timeout.remainingRounds --; } timeout = next; } }
执行任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
publicvoidexpire(){ // CAS 将任务状态设置为已过期,防止任务重复执行 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; }
try { task.run(this); // 运行任务 } catch (Throwable t) { // 捕获异常 if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } }
public Set<Timeout> stop(){ // 终止时间轮的线程不能是时间轮的执行线程 if (Thread.currentThread() == workerThread) { thrownew IllegalStateException( HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); } // CAS 将时间轮的状态修改为关闭状态 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2. // 到这里说明修改失败,之前时间轮的状态可能为 WORKER_STATE_INIT 或 WORKER_STATE_SHUTDOWN // 为 WORKER_STATE_INIT 表示时间轮没有任务,因此不用返回未处理的任务,但是需要将时间轮实例 -1 // 为 WORKER_STATE_SHUTDOWN 表示是 CAS 操作失败,什么都不用做,因为 CAS 成功的线程会处理 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } // CAS 操作失败,或者时间轮没有处理过任务,返回空的任务列表 return Collections.emptySet(); }