背景
在实际的业务场景中,我们常常需要周期性执行一些任务,比如巡查系统资源,处理过期数据等等。这些事情如果人工去执行的话,无疑是对人力资源的浪费。因此我们就开发出了定时任务。目前业界已有许多出色的定时任务框架,如quartz,elastic-job,包括SpringBoot也提供了定时任务,当然JDK本身也提供了定时任务功能。 那么我们在用这些框架的时候,有没有想过它们是怎么实现定时任务的呢?时间轮算法就是这样一种实现定时任务的方法。
一、概述
时间轮算法是通过一个时间轮去维护定时任务,按照一定的时间单位对时间轮进行划分刻度。然后根据任务的延时计算任务该落在时间轮的第几个刻度,如果任务时长超出了时间轮的刻度数量,则增加一个参数记录时间轮需要转动的圈数。
时间轮每转动一次就检查当前刻度下的任务圈数是否为0,如果为0说明时间到了就执行任务,否则就减少任务的圈数。这样看起来已经很好了,可以满足基本的定时任务需求了,但是我们还能不能继续优化一下呢?答案是可以的。想想我们家里的水表,它是不是有多个轮子在转动,时间轮是不是也可以改造成多级联动呢?建立3个时间轮,月轮、周轮、日轮,月轮存储每个月份需要执行定时任务,转动时将当月份的任务抛到周轮,周轮转动时将当天的任务抛到日轮中,日轮转动时直接执行当前刻度下的定时任务。
1.1 绝对时间和相对时间
定时任务一般有两种:
- 1、约定一段时间后执行。
- 2、约定某个时间点执行。
其实这两者是可以互相转换的,比如现在有一个定时任务是12点执行,当前时间是9点,那就可以认为这个任务是3小时后执行。同样,现在又有一个任务,是3小时后执行,那也可以认为这个任务12点执行。
假设我们现在有3个定时任务A、B、C,分别需要在3点、4点和9点执行,我们把它们都转换成绝对时间。
只需要把任务放到它需要被执行的时刻,然后等到时针转到相应的位置时,取出该时刻放置的任务,执行就可以了。这就是时间轮算法的核心思想。
1.2 重复执行
多数定时任务是需要重复执行,比如每天上午9点执行生成报表的任务。对于重复执行的任务,其实我们需要关心的只是下次执行时间,并不关心这个任务需要循环多少次,还是那每天上午9点的这个任务来说。
- 1、比如现在是下午4点钟,我把这个任务加入到时间轮,并设定当时针转到明天上午九点(该任务下次执行的时间)时执行。
- 2、时间来到了第二天上午九点,时间轮也转到了9点钟的位置,发现该位置有一个生成报表的任务,拿出来执行。
- 3、同时时间轮发现这是一个循环执行的任务,于是把该任务重新放回到9点钟的位置。
- 4、重复步骤2和步骤3。
如果哪一天这个任务不需要再执行了,那么直接通知时间轮,找到这个任务的位置删除掉就可以了。由上面的过程我们可以看到,时间轮至少需要提供4个功能:
- 1、加入任务
- 2、执行任务
- 3、删除任务
- 4、沿着时间刻度前进
1.3 时间轮的数据结构
时钟可以使用数组来表示,那么时钟的每一个刻度就是一个槽,槽用来存在该刻度需要被执行的定时任务。正常业务中,同一时刻中是会存在多个定时任务的,所以每个槽中放一个链表或者队列就可以了,执行的时候遍历一遍即可。
同一时刻存在多个任务时,只要把该刻度对应的链表全部遍历一遍,执行(扔到线程池中异步执行)其中的任务即可。
1.4 时间刻度不够用
增加时间轮的刻度
现在有我有2个定时任务,一个任务每周一上午9点执行,另一个任务每周三上午9点执行。最简单的办法就是增大时间轮的长度,可以从12个加到168 (一天24小时,一周就是168小时),那么下周一上午九点就是时间轮的第9个刻度,这周三上午九点就是时间轮的第57个刻度。
这样做的缺点:
- 1、时间刻度太多会导致时间轮走到的多数刻度没有任务执行,比如一个月就2个任务,我得移动720次,其中718次是无用功。
- 2、时间刻度太多会导致存储空间变大,利用率变低,比如一个月就2个任务,我得需要大小是720的数组,如果我的执行时间的粒度精确到秒,那就更恐怖了。
1.5 任务增加round属性
现在时间轮的刻度还沿用24,但是槽中的每个任务增加一个round属性,代表时钟转过第几圈之后再次转到这个槽的时候执行。
上图代表任务三在指针下一圈移动时执行,整体流程就是时间轮没移动一个刻度的时候都要遍历槽中所有任务,对每个任务的round属性减1,并取出round为0的任务调度,这样可以解决增加时间轮带来的空间浪费。但是这样带来的问题时,每次移动刻度的耗时会增加,当时间刻度很小(秒级甚至毫秒级),任务列表有很长,这种方案是不能接受的。
1.6 分层时间轮
分层时间轮是这样一种思想:
- 1、针对时间复杂度的问题:不做遍历计算round,凡是任务列表中的都应该是应该被执行的,直接全部取出来执行。
- 2、针对空间复杂度的问题:分层,每个时间粒度对应一个时间轮,多个时间轮之间进行级联协作。
假设现在有3个定时任务:
- 任务一每天上午9点执行
- 任务二每周2上午9点执行
- 任务三每月12号上午9点执行。
根据这三个任务的调度粒度,可以划分为3个时间轮,月轮、周轮和天轮,初始添加任务时,任务一被添加到天轮上,任务二被添加到周轮,任务三被添加到月轮上。三个时间轮按各自的刻度运转,当周轮移动到刻度2时,取出任务二丢到天轮上,当天轮移动到刻度9时执行。同样任务三在移动到刻度12时,取出任务三丢给月轮。以此类推。
1.7 round时间轮和分层时间轮的一点比较
相比于round时间轮思想,采用分层时间轮算法的优点在于:只需要多耗费极少的空间(从1个时间轮到3个时间轮),就能实现多线程在效率上的提高(一个时间轮是一个线程去行走,3个时间轮可以3个线程行走)。当然这是相对的,若提交的任务都是每隔几个小时重复执行,那显然小时时间轮比月、周、小时时间轮组合的耗费空间少,且执行时间还相同。
1.8 时间轮的应用
时间轮的思想应用范围非常广泛,各种操作系统的定时任务调度,Crontab、Dubbo、新版的XXL-JOB、还有基于java的通信框架Netty中也有时间轮的实现,几乎所有的时间任务调度系统采用的都是时间轮的思想。
至于采用round型的时间轮还是采用分层时间轮,看实际需要吧,时间复杂度和实现复杂度的取舍。
二、时间轮定时使用方式
用Netty的HashedWheelTimer来实现,给Pom加上下面的依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.75.Final</version> </dependency>使用测试:
@RunWith(SpringRunner.class) @SpringBootTest public class Test { @Test public void test() throws InterruptedException { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); HashedWheelTimer timer = new HashedWheelTimer(new NamedThreadFactory("timer-task"), 1, TimeUnit.MILLISECONDS,8); TimerTask timerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { System.out.println("hello world " + LocalDateTime.now().format(formatter)); //执行完成之后再次加入调度 timer.newTimeout(this, 4, TimeUnit.SECONDS); } }; //将定时任务放入时间轮 timer.newTimeout(timerTask, 4, TimeUnit.SECONDS); Thread.currentThread().join(); } }在这里使用的是 netty 使用时间轮算法实现的HashedWheelTimer来做的每隔 4s 的定时调度。
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) { this(threadFactory, tickDuration, unit, ticksPerWheel, true); }使用方式比较简单,创建一个HashedWheelTimer时间轮定时器对象,threadFactory:创建线程的线程工厂
- tickDuration:一个间隔时间(步长)
- tickDuration:间隔时间的单位
- ticksPerWheel:时间轮的大小
输出如下:
hello world 2022-04-12 18:46:36 hello world 2022-04-12 18:46:40 hello world 2022-04-12 18:46:44 hello world 2022-04-12 18:46:48 hello world 2022-04-12 18:46:52 hello world 2022-04-12 18:46:56 hello world 2022-04-12 18:47:00 hello world 2022-04-12 18:47:04 hello world 2022-04-12 18:47:08 hello world 2022-04-12 18:47:12 hello world 2022-04-12 18:47:16 hello world 2022-04-12 18:47:20三、时间轮定时内部原理
时间轮定时器原理基本都是如下图:
时间轮算法可以简单的看成一个循环数组+双向链表的数据结构实现的。 循环数组构成一个环形结构,指针每隔 tickDuration 时间走一步,每个数组上挂载一个双向链表结构的定时任务列表。
双向链表上的任务有个属性为 remainingRounds,即当前任务剩下的轮次是多少,每当指针走到该任务的位置时,remainingRounds 减 1,直到remainingRounds 为 0 时,定时任务触发。
通过时间轮算法的原理图我们可以知道,tickDuration 越小,定时任务越精确。
3.1 时间轮定时源码剖析
3.1.1 构造方法
首先从 HashedWheelTimer 的构造方法分析
public class HashedWheelTimer implements Timer { public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { //线程工厂非null判断 if (threadFactory == null) { throw new NullPointerException("threadFactory"); } //时间单位非null判断 if (unit == null) { throw new NullPointerException("unit"); } //时间间隔(步长)大于0判断 if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } //循环数组长度大于0判断 if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // Normalize ticksPerWheel to power of two and initialize the wheel. // 将ticksPerWheel修改为2的整数次幂 并且新建数组 wheel = createWheel(ticksPerWheel); // 数组长度-1,其二进制均为1. 通过指针tick&mask 获取当前的数组下标,类似于hashmap的 hashcode&(len -1) mask = wheel.length - 1; // Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration); // Prevent overflow. if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } if (duration < MILLISECOND_NANOS) { if (logger.isWarnEnabled()) { logger.warn("Configured tickDuration %d smaller then %d, using 1ms.", tickDuration, MILLISECOND_NANOS); } this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } //创建工作线程,该线程会定期的移动指针,扫描链表任务,后面再分析 workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; this.maxPendingTimeouts = maxPendingTimeouts; //判断HashedWheelTimer实例是否创建太多,如果是就输出一个日志 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } } }构造方法比较简单明了,主要是做一些初始化工作,比如数组的长度控制为2的整数次幂,新建数组,新建工作线程等。
3.2 添加任务
继续往下看如何向时间轮定时器添加一个定时任务。
public class HashedWheelTimer implements Timer { @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } //一个计数器,表示当前在队列中等待的任务数量 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); //默认maxPendingTimeouts为-1,如果该值>0.添加新任务时会进行判断,如果当前任务大于maxPendingTimeouts,则跑出拒绝异常 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } //检测工作线程扫描是否启动,如果未启动,启动下 start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. //startTime为工作线程启动的时间,deadline为:System.nanoTime()+任务延迟时间-工作线程的启动时间 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. //溢出判断,因为startTime是在start()方法中启动工作线程后赋值的, //在delay大于0的情况下,deadline是不可能小于0,除非溢出了。如果溢出了为deadline赋值一个最大值 if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } //创建HashedWheelTimeout对象 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); //将任务加入timeouts队列 timeouts.add(timeout); return timeout; } }该方法主要执行以下几个工作
- 1.参数非空校验
- 2.任务数量最大值检测
- 3.工作线程启动
- 4.获取任务的 deadline,将任务封装为 HashedWheelTimeout 对象
- 5.将 HashedWheelTimeout 对象放入任务队列 timeouts
3.3 工作线程启动
下面简单看下 start 方法
public class HashedWheelTimer implements Timer { public void start() { switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { //如果发现当前工作线程的状态为WORKER_STATE_INIT 初始化状态,则设置线程状态为 WORKER_STATE_STARTED并 启动工作线程 workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. //startTime 初始值为0,并且在工作线程启动后设置。startTimeInitialized是一个CountDownLatch锁,在工作线程启动后释放 while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } } }该方法主要是启动工作线程并等待工作线程启动完成。 继续看工作线程的 run 方法做什么事情
3.4 工作线程run方法
public class HashedWheelTimer implements Timer { private final class Worker implements Runnable { private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); private long tick; @Override public void run() { // Initialize the startTime. //线程启动后初始化startTime 时间为System.nanoTime() startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). //释放start方法中的CountDownLatch锁 startTimeInitialized.countDown(); //在当前工作线程状态一直为 WORKER_STATE_STARTED 时循环执行 do { //waitForNextTick 主要是指针跳动,内部使用Thread.sleep实现 final long deadline = waitForNextTick(); //小于0表示收到了关闭的信号 if (deadline > 0) { //tick和mask进行按位与操作获取到当前数组下标位置 int idx = (int) (tick & mask); //从时间轮中移除所有已经取消的定时任务 processCancelledTasks(); //获取到下标对应的链表头 HashedWheelBucket bucket = wheel[idx]; //将队列中的定时任务放入到时间轮中 transferTimeoutsToBuckets(); //遍历链表任务,将达到执行时间的任务触发执行 bucket.expireTimeouts(deadline); //指针+1 tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. //工作线程停止后,将时间轮上的所有任务放入unprocessedTimeouts集合 for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } //将任务队列中的任务也放入unprocessedTimeouts集合 for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } //移除所有的未处理的定时任务 processCancelledTasks(); } } }该部分代码主要分为以下几个部分
- 设置线程的启动时间 startTime
- 在工作线程启动的状态下
- 根据用户配置的 tickDuration 指针每次跳动一下
- 从时间轮中移除所有已经取消的定时任务
- 将队列中的定时任务放入到时间轮中
- 遍历链表任务,将达到执行时间的任务触发执行
- 工作线程停止后的清理工作
- 下面看一下指针跳动的代码
3.5 指针跳动
public class HashedWheelTimer implements Timer { private final class Worker implements Runnable { private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); private long tick; private long waitForNextTick() { //获取下一个指针的deadline时间 long deadline = tickDuration * (tick + 1); for (;;) { //当前工作线程的活动时间 final long currentTime = System.nanoTime() - startTime; //计算还需要多久达到deadline 。 //这里加上999999的原因是因为/只会取整数部分,并且是使用Thread.sleep时间的,参数为毫秒。 //为了保证任务不被提前执行,加上999999后就能够向上取整1ms。 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; //sleepTimeMs 小于0表示达到了任务的触发时间 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (PlatformDependent.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } } }通过源码分析我们可以看到时间轮算法实现的指针跳动是通过Thread.sleep 实现的,难以理解的就是 (deadline - currentTime + 999999) / 1000000;
3.6 将队列任务放入时间轮中
在工作线程的 run 方法中会调用 transferTimeoutsToBuckets方法,该方法会将用户提交到队列中的定时任务移动到时间轮中,下面具体分析下
public class HashedWheelTimer implements Timer { private final class Worker implements Runnable { private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); private long tick; private void transferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. //每次最多只迁移 10W 个定时任务,主要是为了防止迁移时间过长,导致时间轮中的任务延迟执行 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } //如果任务已经被取消,就跳过 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } //计算任务需要放入的数组位置 long calculated = timeout.deadline / tickDuration; //由于时间轮中的数组是循环数组,计算还需要几个轮次 timeout.remainingRounds = (calculated - tick) / wheel.length; //calculated 和tick 取最大,主要是为了保证过时的任务能够被调度。 //正常情况下calculated是大于tick的, //如果某些任务执行时间过长,导致tick大于calculated,此时直接把过时的任务放到当前链表队列 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. //按位与获取任务的执行位置 int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; //将任务放入当前数组上的链表 bucket.addTimeout(timeout); } } } }transferTimeoutsToBuckets 方法很简单,我们主要要记住两点
- 1.每次最多会迁移10W 个队列中的任务到时间轮中,为了保证不影响工作线程的指针跳动
- 2.并且我们发现取消的任务会直接跳过,过时的任务会直接放到当前位置。
3.7 链表任务遍历
public class HashedWheelTimer implements Timer { private static final class HashedWheelBucket { // Used for the linked-list datastructure private HashedWheelTimeout head; private HashedWheelTimeout tail; /** * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. */ public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts //遍历链表的所有任务 while (timeout != null) { HashedWheelTimeout next = timeout.next; //如果剩下的轮次<=0 if (timeout.remainingRounds <= 0) { //从双向链表中移除该任务 next = remove(timeout); //如果当前任务的deadline小于目前时间轮的deadline,表示任务已经可以被触发 if (timeout.deadline <= deadline) { //任务执行 timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { //任务取消也从链表中移除 next = remove(timeout); } else { // 任务的剩余轮次-1 timeout.remainingRounds --; } //链表遍历 timeout = next; } } } }该方法主要是遍历链表上的定时任务
- 任务所剩轮次为 0 并且任务的 deadline 小于目前时间轮的 deadline,任务触发执行
- 任务被取消,从链表中移除
- 任务轮次大于 0 并且还未取消,轮次 -1
- 遍历下个定时任务
3.8 定时任务执行
public class HashedWheelTimer implements Timer { private static final class HashedWheelTimeout implements Timeout { public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } } } }定时任务执行代码,看着很简单,首先将任务的状态设置为ST_EXPIRED,然后直接调用 run方法执行任务,这里说明任务是在工作线程中执行的,也就是说如果任务执行时间过长,会影响其它定时任务的触发。
参考: https://blog.csdn.net/qq_34772568/article/details/105534389
https://blog.csdn.net/su20145104009/article/details/115636136
https://blog.csdn.net/code_geek/article/details/113133327
https://blog.csdn.net/beslet/article/details/119974430
https://blog.csdn.net/qq_34039868/article/details/105384808