大纲
1.什么是时间轮
2.HashedWheelTimer是什么
3.HashedWheelTimer的使用
4.HashedWheelTimer的运行流程
5.HashedWheelTimer的核心字段
6.HashedWheelTimer的构造方法
7.HashedWheelTimer添加任务和实行任务
8.HashedWheelTimer的完备源码
9.HashedWheelTimer的总结
10.HashedWheelTimer的应用
1.什么是时间轮
简朴来说,时间轮是一个高效利用线程资源举行批量化调理的调理器。首先把大批量的调理任务全部绑定到同一个调理器上,然后使用这个调理器对所有任务举行管理、触发、以及运行,所以时间轮能高效管理各种延时任务、周期任务、关照任务。
时间轮是以时间作为刻度构成的一个环形队列,所以叫做时间轮。这个环形队列通过一个HashedWheelBucket[]数组来实现,数组的每个元素称为槽,每个槽可以存放一个定时任务列表,叫HashedWheelBucket。HashedWheelBucket是一个双向链表,链表的每个节点表示一个定时任务项HashedWheelTimeout。在HashedWheelTimeout中封装了真正的定时任务TimerTask。
时间轮由多个时间格构成,每个时间格代表当前时间轮的基本时间跨度ticketDuration,此中时间轮的时间格的个数是固定的。
如下图示,有16个时间格(槽),假设每个时间格的单位是1s,那么整个时间轮走完一圈需要16s。每秒钟(即时间格的单位也可以为1ms、1min、1h等)指针会沿着顺时针方向转动一格。通过指针移动来获得每个时间格中的任务列表,然后遍历这个时间格内的双向链表的每个任务并实行,依此循环。
2.HashedWheelTimer是什么
Netty的HashedWheelTimer是一个粗略的定时器实现,之所以称为粗略的实现是因为该时间轮并没有严酷准时地实行定时任务,而是在每隔一个时间间隔之后的时间节点实行,并实行当前时间节点之前到期的定时任务。
不过具体的定时任务的时间实行精度,可以通过调节HashedWheelTimer构造方法的时间间隔的巨细来举行调节。在大多数网络应用的情况下,由于IO耽误的存在,所以并不会严酷要求具体的时间实行精度。因此默认100ms的时间间隔可以满足大多数情况,不需要再花精力去调节该时间精度。
3.HashedWheelTimer的使用
- public class HashedWheelTimerTest {
- //构建HashedWheelTimer时间轮
- //最后通过HASHED_WHEEL_TIMER.newTimeout()方法把需要延迟执行的任务添加到时间轮中
- private static final HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer(
- new DefaultThreadFactory("demo-timer"),//threadFactory参数表示创建处理任务的线程工厂
- 100,//tickDuration参数表示每个时间格代表当前时间轮的基本时间跨度,这里是100ms,也就是指针100ms跳动一次,每次跳动一个窗格
- TimeUnit.MILLISECONDS,
- 512,//ticksPerWheel参数表示时间轮上一共有多少个时间格,分配的时间格越多,占用内存空间就越大,这里是512
- true//leakDetection参数表示是否开启内存泄漏检测
- );
- public static void main(String[] args) {
- System.out.println("延时任务提交");
- //延时多久执行
- long delay = 10L;
- HASHED_WHEEL_TIMER.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- System.out.println("延时任务触发");
- }
- }, delay, TimeUnit.SECONDS);
- }
- }
复制代码 4.HashedWheelTimer的运行流程
步调一:初始化时间轮
步调二:启动时间轮
步调三:添加任务,保存到延时任务队列
步调四:时间轮指针休眠阻塞,实现转动
步调五:休眠结束,指针指向下一个时间格(槽)
步调六:将已经取消的任务从对应的槽中移除
步调七:将延时任务队列的任务添加到对应的槽中
步调八:实行时间轮指针指向当前槽的到期任务
5.HashedWheelTimer中的关键字段
字段一:wheel
wheel是一个HashedWheelBucket数组,默认的数组巨细是512。可以认为wheel是一个TimerTask的哈希表,它的哈希函数是任务的截止日期。所以每个时间轮的时间格数ticksPerWheel默认是512。
字段二:tickDuration
时间格跨度,默认100ms。
字段三:ticksPerWheel
时间轮的格子数,默认512。
字段四:maxPendingTimeouts
时间轮中任务的最大数量。
字段五:deadline
延时任务的截止时间,值为当前时间 + 延时任务的延时时间 - 时间轮启动时间。
字段六:tick
时间轮启动以来指针总的转动次数。
字段七:remainingRounds
槽中延时任务剩余的圈(轮)数,为0时则表示需要实行延时任务了。
6.HashedWheelTimer的构造方法
步调一:构造参数校验及给实际实行延时任务的线程池taskExecutor赋值
步调二:将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂
步调三:初始化HashedWheelBucket数组wheel
步调四:校验tickDuration和ticksPerWheel
步调五:创建工作线程workerThread,用于指针转动和触发实行时间格里的延时任务
步调六:给时间轮中延时任务的最大数量maxPendingTimeouts赋值
步调七:检查HashedWheelTimer的实例数量,如果大于64则打印error日记

- //4.1.73.Final
- public class HashedWheelTimer implements Timer {
- private final HashedWheelBucket[] wheel;
- private final int mask;
- private final long tickDuration;
- private final Thread workerThread;
- private final ResourceLeakTracker<HashedWheelTimer> leak;
- private final Worker worker = new Worker();
- private final long maxPendingTimeouts;
- private final Executor taskExecutor;
- ...
- //Creates a new timer.
- //@param threadFactory 创建线程的工厂
- //@param tickDuration 每格的时间间隔,默认100ms,0.1秒
- //@param unit 时间单位,默认为毫秒
- //@param ticksPerWheel 时间轮的格子数,默认为512;如果传入的不是2的N次方,则会调整为大于等于该参数的第一个2的N次方,好处是可以优化hash值的计算
- //@param leakDetection 如果false,那么只有工作线程不是后台线程时才会追踪资源泄露,这个参数可以忽略
- //@param maxPendingTimeouts 最大的pending数量(时间轮中任务的最大数量),超过这个值之后调用将抛出异常,0或者负数表示没有限制,默认为-1
- //@param taskExecutor 任务线程池,用于执行提交的任务,调用者负责在不需要时关闭它
- //@throws NullPointerException if either of threadFactory and unit is null
- //@throws IllegalArgumentException if either of tickDuration and ticksPerWheel is <= 0
- public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) {
- //1.构造参数校验及给实际执行延时任务的线程池taskExecutor赋值
- checkNotNull(threadFactory, "threadFactory");
- checkNotNull(unit, "unit");
- checkPositive(tickDuration, "tickDuration");
- checkPositive(ticksPerWheel, "ticksPerWheel");
- this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
- //2.将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂,方便进行求商和取余计算
- //3.初始化时间轮wheel
- wheel = createWheel(ticksPerWheel);
- //mask的设计和HashMap一样,通过限制数组的大小为2的幂,利用位运算来替代取模运算,提高性能
- mask = wheel.length - 1;
-
- //4.校验tickDuration和ticksPerWheel
- //Convert tickDuration to nanos.
- long duration = unit.toNanos(tickDuration);
- //防止溢出
- //tickDuration * ticksPerWheel必须小于Long.MAX_VALUE
- if (duration >= Long.MAX_VALUE / wheel.length) {
- throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length));
- }
- //tickDuration不能小于1ms
- if (duration < MILLISECOND_NANOS) {
- logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);
- this.tickDuration = MILLISECOND_NANOS;
- } else {
- this.tickDuration = duration;
- }
- //5.创建工作线程,用于指针转动和触发时间格里的延时任务的执行
- workerThread = threadFactory.newThread(worker);
- leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
- //6.给时间轮中任务的最大数量maxPendingTimeouts赋值
- this.maxPendingTimeouts = maxPendingTimeouts;
- //7.检查HashedWheelTimer的实例数量,如果大于64则打印error日志
- if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
- WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
- reportTooManyInstances();
- }
- }
-
- //初始化时间轮环形数组
- //@param ticksPerWheel
- private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
- //ticksPerWheel不能大于2^30
- checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
- //将ticksPerWheel(轮子上的时间格数)向上取值为2的次幂
- ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
- //创建时间轮环形数组
- HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
- for (int i = 0; i < wheel.length; i ++) {
- wheel[i] = new HashedWheelBucket();
- }
- return wheel;
- }
- ...
- }
复制代码
7.HashedWheelTimer添加任务和实行任务
(1)添加延时任务
(2)实行延时任务
(1)添加延时任务
步调一:将需要实行的延时任务数pendingTimeouts+1
步调二:如果pendingTimeouts超过maxPendingTimeouts,则抛出异常
步调三:启动工作线程,也就是启动时间轮
步调四:盘算被添加的延时任务的截止时间=当前时间+当前任务实行的耽误时间-时间轮启动的时间
步调五:创建延时任务实例HashedWheelTimeout
步调六:将延时任务实例添加到延时任务队列timeouts中

注意:添加时会将延时任务添加到延时任务队列timeouts中。这个延时任务队列timeouts将会在下一个滴答声中举行处置惩罚(指针的下一次转动)。在处置惩罚过程中,所有列队的HashedWheelTimeout将被添加到正确的HashedWheelBucket。
- public class HashedWheelTimer implements Timer {
- private final AtomicLong pendingTimeouts = new AtomicLong(0);//需要执行的延时任务数
- private final long maxPendingTimeouts;
- private volatile long startTime;
- private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
- private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();//延时任务队列
- ...
- //添加延时任务
- //@param task 任务
- //@param delay 延时时间
- //@param unit 延时时间单位
- @Override
- public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
- checkNotNull(task, "task");
- checkNotNull(unit, "unit");
- //1.将需要执行的延时任务数pendingTimeouts + 1
- long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
-
- //2.如果pendingTimeouts超过maxPendingTimeouts,则抛出异常
- if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
- pendingTimeouts.decrementAndGet();
- throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
- + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")");
- }
-
- //3.启动工作线程,即启动时间轮
- start();
-
- //将延时任务添加到延时任务队列timeouts中,该队列将在下一个滴答声中处理(指针的下一次转动)
- //在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket
- //4.计算任务的截止时间deadline = 当前时间 + 当前任务执行的延迟时间 - 时间轮启动的时间
- long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
- if (delay > 0 && deadline < 0) {
- deadline = Long.MAX_VALUE;
- }
-
- //5.创建延时任务实例HashedWheelTimeout
- HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
-
- //6.将延时任务实例添加到延时任务队列中
- timeouts.add(timeout);
- return timeout;
- }
-
- //Starts the background thread explicitly.
- //The background thread will start automatically on demand even if you did not call this method.
- //@throws IllegalStateException if this timer has been #stop() stopped already
- public void start() {
- switch (WORKER_STATE_UPDATER.get(this)) {
- case WORKER_STATE_INIT:
- if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
- //启动工作线程,即启动时间轮
- workerThread.start();
- }
- break;
- case WORKER_STATE_STARTED:
- break;
- case WORKER_STATE_SHUTDOWN:
- throw new IllegalStateException("cannot be started once stopped");
- default:
- throw new Error("Invalid WorkerState");
- }
- //Wait until the startTime is initialized by the worker.
- while (startTime == 0) {
- try {
- //阻塞时间轮的工作线程
- startTimeInitialized.await();
- } catch (InterruptedException ignore) {
- //Ignore - it will be ready very soon.
- }
- }
- }
- ...
- }
复制代码 (2)实行延时任务
步调一:纪录时间轮启动的时间startTime
步调二:开始do while循环,唤醒被阻塞的start()方法,关照时间轮已经启动完毕
步调三:阻塞等待下一次指针转动的时间
步调四:盘算当前指针指向的时间轮的槽位idx
步调五:将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
步调六:获取当前指针指向的时间槽HashedWheelBucket
步调七:遍历延时任务队列timeouts,将此中的延时任务保存到对应的槽的链表中,根据延时时间盘算对应的时间槽和remainingRounds圈数
步调八:运行目前指针指向的时间槽中的链表的任务,通过taskExecutor线程池去实行到期的任务
步调九:到期的和取消的延时任务从链表中移除并将pendingTimeouts--
步调十:时间轮指针的总转动次数tick++,继续do while循环
步调十一:扫除时间轮中不需要处置惩罚的任务,保存到unprocessedTimeouts中
步调十二:将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中
步调十三:将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |