0%

时间轮算法

基于优先队列插入、删除延迟任务的时间复杂为O(logn);

基于时间轮算法插入、删除延迟任务的时间复杂度是 O(1) 。

前置知识

守护线程

在 Java 中线程分为两类:

  • 用户线程(User Thread)
  • 守护线程(Daemon Thread)

通常默认创建的线程就是用户线程,通过 Thread.setDaemon(true) 可以将线程设置为守护线程。

当所有的用户线程都退出时,守护线程也会自动退出。例如:jvm 的垃圾回收线程就是一个守护线程,当所有用户线程都执行完毕后,垃圾回收线程就没用了,也就自己关闭了。

Hook 线程

当 jvm 进程退出的时候,或者受到了系统的中断信号,hook线程就会启动,一个线程可以注入多个钩子。

例如:

1
2
3
4
5
6
7
8
9
10
public class HookTest {

public static void main(String[] args) {
System.out.println("main thread start");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("hook thread...");
}));
System.out.println("main thread end");
}
}

输出:

1
2
3
main thread start
main thread end
hook thread...

注意事项:如果是通过 kill -9 pid这种方式强制杀死的进程,那进程是不会去执行 Hook 线程的。

Timer

使用示例

延迟1秒执行 task1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TimerTest {

public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
System.out.println("task1 in " + System.currentTimeMillis());
}
};
timer.schedule(task1, 1000);
System.out.println("start in " + System.currentTimeMillis());
}
}

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Timer {
/**
* 优先队列,本质上是一个基于数组的小顶堆,顶堆是下一个需要执行的任务
* The timer task queue. This data structure is shared with the timer
* thread. The timer produces tasks, via its various schedule calls,
* and the timer thread consumes, executing timer tasks as appropriate,
* and removing them from the queue when they're obsolete.
*/
private final TaskQueue queue = new TaskQueue();

/**
* 执行延迟任务的线程
* The timer thread.
*/
private final TimerThread thread = new TimerThread(queue);

/**
* Creates a new timer whose associated thread has the specified name.
* The associated thread does <i>not</i>
* {@linkplain Thread#setDaemon run as a daemon}.
*
* @param name the name of the associated thread
* @throws NullPointerException if {@code name} is null
* @since 1.5
*/
public Timer(String name) {
thread.setName(name);
thread.start(); // 构造时默认启动执行线程
}
}

Timer 的核心就是一个优先队列和一个执行任务的线程。由于优先队列是基于堆实现的,根据堆的特性可知添加任务和删除任务的时间复杂度是:O(log n)。

运行原理

执行延迟任务的线程逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* The main timer loop. (See class comment.)
*/
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// Wait for queue to become non-empty
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait(); // 如果队列为空则一直等待,添加任务时则会通知
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die

// Queue nonempty; look at first evt and do the right thing
long currentTime, executionTime;
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
queue.removeMin(); // 不是周期性任务则直接移除
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
// 是周期性任务则修改当前任务时间为下次执行的时间
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}

运行流程大致如下:

TimerThread 不断地拿排着的第一个任务的执行时间和当前时间做对比。如果时间到了先看看这个任务是不是周期性执行的任务,如果是则修改当前任务时间为下次执行的时间,如果不是周期性任务则将任务从优先队列中移除,最后执行任务,如果时间还未到则调用 wait() 等待

小结

  1. 优先队列的插入和删除的时间复杂度是O(logn),当数据量大的时候,频繁的入堆出堆性能有待考虑。
  2. 单线程执行,那么如果任务执行的时间较长则会影响下一个任务的执行时间。
  3. 没有捕获异常,一个任务出错的时候会导致之后的任务都无法执行。

ScheduledThreadPoolExecutor

JDK1.5 引入了 ScheduledThreadPoolExecutor,它是一个具有更多功能的 Timer 的替代品,允许多个执行线程。如果只设置一个执行线程和 Timer 没啥差别,不过它对异常进行了处理。

使用示例

1
2
3
4
5
6
7
8
public class ScheduledThreadPoolExecutorTest {

public static void main(String[] args) {
int corePool = Runtime.getRuntime().availableProcessors() * 2;
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(corePool);
scheduledExecutorService.schedule(() -> System.out.println("task1"), 1L, TimeUnit.SECONDS);
}
}

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {

/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
// ...
}
}

ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,实现了 ScheduledExecutorService接口。其中:

DelayedWorkQueue 继承了阻塞队列,实现了优先队列,也是利用数组实现的小顶堆。

ScheduledFutureTask 继承自 FutureTask 重写了 run 方法,实现了周期性任务的需求。

运行原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic) // 如果不是周期性任务,直接运行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime(); // 重设下一次执行任务时间
reExecutePeriodic(outerTask); // 重新入队
}
}
}
// ScheduledFutureTask.super.run(); 父类FutureTask中的run方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) { // 捕获异常
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

小结

ScheduledThreadPoolExecutor 是用来代替 Timer 的, 运行流程和 Timer 差不多,也是维护一个优先队列然后通过重写 task 的 run 方法来实现周期性任务,主要差别在于能多线程运行任务,不会单线程阻塞,由于优先队列都是使用小顶堆来实现,所以添加和删除的时间复杂度都是O(logn)

运行时有捕获异常,所以一个任务出错也不会影响之后的任务

DelayQueue

Java 中还有个延迟队列 DelayQueue,加入延迟队列的元素都必须实现 Delayed 接口。延迟队列内部是利用 PriorityQueue 实现的,所以还是利用优先队列。Delayed 接口继承了Comparable 因此优先队列是通过 delay 来排序的。

核心属性

1
2
3
4
5
6
7
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> { // 元素必须实现Delayed接口

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>(); // 优先队列
// ...
}

take方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await(); // 如果队列为空则等待元素插入
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll(); // 小于等于0则说明时间到了,直接出队
first = null; // don't retain ref while waiting
if (leader != null) // leader是减少不必要的等待,没抢到当leader线程都await()
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 等待时间到达
} finally {
if (leader == thisThread) // 执行完了重置leader
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(); // 唤醒想当leader的线程
lock.unlock();
}
}

小结

DelayQueue 通过优先队列来获取最早需要执行的任务,而优先队列是一个基于数组实现的小顶堆,其添加和删除任务的时间复杂度为 O(logn)。

时间轮算法

Timer 、ScheduledThreadPool和DelayQueue 添加和删除任务的时间复杂度为 O(logn)。而时间轮算法插入删除的时间复杂度是 O(1) ,因此在Netty、ZooKeeper等高性能中间件中都存在时间轮算法的踪影。

Kafka 时间轮

Kafka中存在一些定时任务,如DelayedFetch、DelayedProduce、DelayedHeartbeat等,在Kafka中,定时任务的添加、轮转、执行、消亡等是通过时间轮来实现的。

JDK中Timer和DelayQueue的插入和删除操作的平均时间复杂度为O(nlogn)并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、ZooKeeper等组件中都存在时间轮的踪影。

Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs×wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。

TimingWheel中的每个双向环形链表TimerTaskList都会有一个哨兵节点(sentinel),引入哨兵节点可以简化边界条件。哨兵节点也称为哑元节点(dummy node),它是一个附加的链表节点,该节点作为第一个节点,它的值域中并不存储任何东西,只是为了操作的方便而引入的。如果一个链表有哨兵节点,那么线性表的第一个元素应该是链表的第二个节点。

如果此时有个定时为350ms的任务该如何处理?直接扩充wheelSize的大小么?Kafka中不乏几万甚至几十万毫秒的定时任务,这个wheelSize的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如100万毫秒,那么这个wheelSize为100万毫秒的时间轮不仅占用很大的内存空间,而且效率也会拉低。Kafka为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。

像现实中的钟表一样,“一格一格”地走,这样就需要有一个线程一直不停的执行,而大多数情况下,时间轮中的bucket大部分是空的,指针的“推进”就没有实质作用。为了减少 空推进,Kafka引入了DelayQueue。Kafka中的定时器借了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList(非哨兵节点)都加入DelayQueue。DelayQueue会根据TimerTaskList对应的超时时间expiration来排序,最短expiration的TimerTaskList会被排在DelayQueue的队头。Kafka 中的 TimingWheel 专门用来执行插入和删除 TimerTaskEntry的操作,而 DelayQueue 专门负责时间推进的任务。

Netty 时间轮

使用示例

1
2
3
4
5
6
7
8
public class HashedWheelTimerTest {

public static void main(String[] args) {
HashedWheelTimer wheelTimer = new HashedWheelTimer();
wheelTimer.newTimeout(timeout -> log.info("task1"), 1, TimeUnit.SECONDS);
log.info("start ");
}
}

时间轮状态

时间轮有以下三种状态:

  • WORKER_STATE_INIT:初始化状态,此时时间轮内的工作线程还没有开启
  • WORKER_STATE_STARTED:运行状态,时间轮内的工作线程已经开启
  • WORKER_STATE_SHUTDOWN:终止状态,时间轮停止工作

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public HashedWheelTimer(ThreadFactory threadFactory) {
// 这里是经过合并的源码
this(Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 512, true, -1);
}

/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @param leakDetection {@code true} if leak detection should be enabled always,
* if false it will only be enabled if the worker thread is
* not a daemon thread.
* @param maxPendingTimeouts The maximum number of pending timeouts after which call to
* {@code newTimeout} will result in
* {@link java.util.concurrent.RejectedExecutionException}
* being thrown. No maximum pending timeouts limit is assumed
* if this value is 0 or negative.
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration,
TimeUnit unit,
int ticksPerWheel,
boolean leakDetection,
long maxPendingTimeouts) {

ObjectUtil.checkNotNull(threadFactory, "threadFactory");
ObjectUtil.checkNotNull(unit, "unit");
ObjectUtil.checkPositive(tickDuration, "tickDuration");
ObjectUtil.checkPositive(ticksPerWheel, "ticksPerWheel");

// Normalize ticksPerWheel to power of two and initialize the wheel.
// 和HashMap类似,时间轮大小为大于等于 ticksPerWheel 的第一个 2 的幂,方便后续通过位运算代替模运算
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;

// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);

// Prevent overflow.时间轮内的时钟拨动频率太大则抛出异常
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
// 时间轮内的时钟拨动频率最小为1毫秒
if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
// 创建工作线程
workerThread = threadFactory.newThread(worker);
// 检测内存是否泄漏
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
// 初始化最大等待任务数,0或负数表示不限制最大等待任务数
this.maxPendingTimeouts = maxPendingTimeouts;
// 如果创建的时间轮实例大于 64,打印日志,并且这个日志只会打印一次
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}

构造函数中的 tickDuration 参数非常重要,应该根据业务的范围设置合理的参数。如果时间轮的拨动频率过大,则可能导致任务触发时间不准确。如果时间轮的拨动频率较小,时间轮转动频繁,任务少的情况下加载不到任务,属于一直空转的状态,会占用 CPU 线程资源。

为了防止时间轮占用过多的 CPU 资源,当创建的时间轮示例数大于 64 时会以日志的方式提示。

构造函数中只是初始化了轮线程,并没有开启,当第一次往时间轮内添加任务时,线程才会开启。

往时间轮中添加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(task, "task");
ObjectUtil.checkNotNull(unit, "unit");
// 等待的任务数+1 pendingTimeoutsCount是AtomicLong的实例,表示等待的任务数
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 如果时间轮内等待的任务数大于最大值,则会抛出异常
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 开启时间轮内的线程
start();

// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
// 计算当前添加任务的执行时间
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// Guard against overflow.
// 防止deadline溢出
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 将任务加入队列
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}

任务会先保存在队列中,当时间轮的时钟拨动时才会判断是否将队列中的任务加载进时间轮。

开启时间轮内的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Starts the background thread explicitly. The background thread will
* start automatically on demand even if you did not call this method.
*
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
// 通过CAS保证只能由一个线程开启时间轮内的线程
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}

// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
// CountDownLatch startTimeInitialized = new CountDownLatch(1);
// startTimeInitialized 是一个CountDownLatch
// 用于保证工作线程的 startTime 属性初始化
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}

这里通过 CAS 加锁的方式保证线程安全,避免多次启动。

执行线程启动后, start() 方法会被阻塞,等执行线程的 startTime 属性初始化完成后才被唤醒。为什么只有等 startTime 初始化后才能继续执行呢?因为上面的 newTimeout 方法在执行线程启动后,需要计算当前添加进来任务的执行时间(任务在何时执行),而这个执行时间是根据 startTime 计算的。

时间轮调度

workerThread.start() 执行后会调用内部 Worker 的 run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}

// Notify the other threads waiting for the initialization at start().
// 唤醒调用 HashedWheelTimer#start() 方法被阻塞的线程
startTimeInitialized.countDown();

do {
// 拨动时钟
final long deadline = waitForNextTick();
if (deadline > 0) {
// 通过与运算计算任务的索引
int idx = (int) (tick & mask);
// 处理过期任务
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
// 将任务加载进时间轮
transferTimeoutsToBuckets();
// 执行当前时间轮槽内的任务
bucket.expireTimeouts(deadline);
tick++;
}
// 判断时间轮状态是否是运行状态
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// Fill the unprocessedTimeouts so we can return them from stop() method.
// 运行到这说明时间轮已关闭
// 将还未执行的任务以列表的形式保存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
// 还未执行的任务可能会在两个地方,一:时间轮数组内,二:队列中
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 处理过期任务
processCancelledTasks();
}

时间轮每拨动一次 tick 就会 +1,根据这个值与(时间轮数组长度 - 1)进行 & 运算,可以定位时间轮数组内的槽。因为 tick 值一直在增加,所以时间轮数组看起来就像一个不断循环的圆。执行步骤大致如下:

  1. 先初始化 startTime 值,因为后面任务执行的时间是根据 startTime 计算的
  2. 拨动时钟,如果时间未到,则 sleep 一会儿
  3. 处理过期的任务
  4. 将任务加载进时间轮
  5. 执行当前时钟对应时间轮内的任务
  6. 时间轮关闭,将所有未执行的任务封装到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
  7. 处理过期的任务

拨动时钟

如果时间轮设置的 tickDuration 为 100ms 拨动一次,当时钟拨动一次后,应该计算下一次时钟拨动的时间,如果还没到就 sleep 一会儿,等到拨动时间再醒来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long waitForNextTick() {
// 计算时钟下次拨动的相对时间
long deadline = tickDuration * (tick + 1);

for (;;) {
// 获取当前时间的相对时间
final long currentTime = System.nanoTime() - startTime;
// 计算距离下次拨动时钟的时间,+ 999999的目的是为了保证足够的 sleep 时间
// 例如:当 deadline - currentTime = 2000002 时,不加 999999 就会只休眠2ms,
// 而 2ms 其实还未达到 deadline 的时间点,加 999999 则会多休眠1ms
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}

// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
// 这里是为了兼容 Windows 平台,因为 Windows 平台的调度最小单位为 10ms,
// 如果不是 10ms 的倍数,可能会引起 sleep 时间不准确
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
if (sleepTimeMs == 0) {
sleepTimeMs = 1;
}
}

try {
// sleep 到下次时钟拨动
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this)
== WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}

如果时间不到就 sleep 等待一会儿,为了使任务时钟准确,可以从上面的代码中看出 Netty 做了一些优化,比如 sleepTimeMs 的计算,Windows 平台的处理等。

将任务从队列加载进时间轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
// 一次最多只处理队列中的 100000 个任务
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
// 过滤已经取消了的任务
continue;
}

// 计算当前任务到执行还需要经过几次时钟拨动
long calculated = timeout.deadline / tickDuration;
// 计算当前任务到执行还需要经过几圈时钟拨动
timeout.remainingRounds = (calculated - tick) / wheel.length;

// Ensure we don't schedule for past.
// 有的任务可能在队列里很长时间,时间过期了也没有被调度,将这种情况的任务放在当前轮次内执行
final long ticks = Math.max(calculated, tick);
// 计算任务在时间轮数组中的槽
int stopIndex = (int) (ticks & mask);

HashedWheelBucket bucket = wheel[stopIndex];
// 将任务放到时间轮的数组中,多个任务可能定位时间轮的同一个槽,这些任务通过以链表的形式链接
bucket.addTimeout(timeout);
}
}

public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}

任务刚加进来不会立即到时间轮中去,而是暂时保存到一个队列中,当时间轮时钟拨动时,会将任务从队列中加载进时间轮内。

时间轮每次最大处理 100000 个任务,因为任务的执行时间是用户自定义的,所以需要计算任务到执行需要经过多少次时钟拨动,并计算时间轮拨动的圈数。接着将任务加载进时间轮对应的槽内,可能有多个任务经过 hash 计算后定位到同一个槽,这些任务会以双向链表的结构保存,有点类似 HashMap 处理碰撞的情况。

执行任务

执行当前时间轮槽内的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;

// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// 任务执行的圈数 > 0,表示任务还需要经过 remainingRounds 圈时钟循环才能执行
if (timeout.remainingRounds <= 0) {
// 从链表中移除当前任务,并返回链表中下一个任务
next = remove(timeout);
if (timeout.deadline <= deadline) {
// 执行任务
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
// 移除取消的任务
next = remove(timeout);
} else {
// 圈数-1
timeout.remainingRounds --;
}
timeout = next;
}
}

执行任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void expire() {
// CAS 将任务状态设置为已过期,防止任务重复执行
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}

try {
task.run(this); // 运行任务
} catch (Throwable t) { // 捕获异常
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}

时间轮槽内的任务以链表形式存储,这些任务执行的时间可能会不一样,有的在当前时钟执行,有的在下一圈或者下两圈对应的时钟执行。当任务在当前时钟执行时,需要将这个任务从链表中删除,重新维护链表关系。

终止时间轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public Set<Timeout> stop() {
// 终止时间轮的线程不能是时间轮的执行线程
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
// CAS 将时间轮的状态修改为关闭状态
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
// 到这里说明修改失败,之前时间轮的状态可能为 WORKER_STATE_INIT 或 WORKER_STATE_SHUTDOWN
// 为 WORKER_STATE_INIT 表示时间轮没有任务,因此不用返回未处理的任务,但是需要将时间轮实例 -1
// 为 WORKER_STATE_SHUTDOWN 表示是 CAS 操作失败,什么都不用做,因为 CAS 成功的线程会处理
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
// CAS 操作失败,或者时间轮没有处理过任务,返回空的任务列表
return Collections.emptySet();
}

try {
boolean interrupted = false;
while (workerThread.isAlive()) {
// 中断时间轮工作线程
workerThread.interrupt();
try {
// 终止时间轮的线程等待时间轮工作线程 100ms,
// 这个过程主要是为了时间轮工作线程处理未执行的任务
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}

if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
// 返回未处理的任务
return worker.unprocessedTimeouts();
}

时间轮停止运行后会将未执行的任务返回出去,至于怎么处理这些任务,由业务方自己定义,这个流程和线程池的 shutdownNow 方法是类似的。

小结

  • Timer、ScheduledThreadPool 和 DelayQueue 都是基于优先队列实现,其 O(logn) 的时间复杂度在任务数多的情况下频繁的入队出队对性能来说有损耗。
  • Timer 是单线程执行,会有阻塞的风险,并且对异常没有做处理。
  • ScheduledThreadPool 则可以使用多线程执行任务,并且线程池对异常做了处理。
  • DelayQueue 本质上就是一个具有优先级的阻塞队列。
  • 时间轮更适合任务数很大的延时场景,它的任务插入和删除时间复杂度都为O(1)。
  • 对于延迟超过时间轮所能表示的范围,Netty是通过增加一个字段-轮数,而Kafka是使用多层次时间轮。
  • Netty 的时间轮实现会有空推进的问题,而 Kafka则借助 DelayQueue 来避免空推进的问题。
坚持原创技术分享,您的支持将鼓励我继续创作!