Netty源码—10.Netty工具之时间轮

打印 上一主题 下一主题

主题 1760|帖子 1760|积分 5280

1.什么是时间轮


简单来说,时间轮是一个高效使用线程资源进行批量化调度的调度器。首先把大批量的调度使命全部绑定到同一个调度器上,然后使用这个调度器对全部使命进行管理、触发、以及运行,所以时间轮能高效管理各种延时使命、周期使命、通知使命。

时间轮是以时间作为刻度构成的一个环形队列,所以叫做时间轮。这个环形队列通过一个HashedWheelBucket[]数组来实现,数组的每个元素称为槽,每个槽可以存放一个定时使命列表,叫HashedWheelBucket。HashedWheelBucket是一个双向链表,链表的每个节点表示一个定时使命项HashedWheelTimeout。在HashedWheelTimeout中封装了真正的定时使命TimerTask。

时间轮由多个时间格构成,每个时间格代表当前时间轮的基本时间跨度ticketDuration,此中时间轮的时间格的个数是固定的。

如下图示,有16个时间格(槽),假设每个时间格的单位是1s,那么整个时间轮走完一圈需要16s。每秒钟(即时间格的单位也可以为1ms、1min、1h等)指针会沿着顺时针方向转动一格。通过指针移动来获得每个时间格中的使命列表,然后遍历这个时间格内的双向链表的每个使命并实行,依此循环。




2.HashedWheelTimer是什么


Netty的HashedWheelTimer是一个大抵的定时器实现,之所以称为大抵的实现是由于该时间轮并没有严格定时地实行定时使命,而是在每隔一个时间隔断之后的时间节点实行,并实行当前时间节点之前到期的定时使命。

不外具体的定时使命的时间实行精度,可以通过调治HashedWheelTimer构造方法的时间隔断的大小来进行调治。在大多数网络应用的情况下,由于IO延迟的存在,所以并不会严格要求具体的时间实行精度。因此默认100ms的时间隔断可以满意大多数情况,不需要再花精神去调治该时间精度。

3.HashedWheelTimer的使用


  1. public class HashedWheelTimerTest {
  2.     //构建HashedWheelTimer时间轮
  3.     //最后通过HASHED_WHEEL_TIMER.newTimeout()方法把需要延迟执行的任务添加到时间轮中
  4.     private static final HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer(
  5.         new DefaultThreadFactory("demo-timer"),//threadFactory参数表示创建处理任务的线程工厂
  6.         100,//tickDuration参数表示每个时间格代表当前时间轮的基本时间跨度,这里是100ms,也就是指针100ms跳动一次,每次跳动一个窗格
  7.         TimeUnit.MILLISECONDS,
  8.         512,//ticksPerWheel参数表示时间轮上一共有多少个时间格,分配的时间格越多,占用内存空间就越大,这里是512
  9.         true//leakDetection参数表示是否开启内存泄漏检测
  10.     );
  11.     public static void main(String[] args) {
  12.         System.out.println("延时任务提交");
  13.         //延时多久执行
  14.         long delay = 10L;
  15.         HASHED_WHEEL_TIMER.newTimeout(new TimerTask() {
  16.             @Override
  17.             public void run(Timeout timeout) throws Exception {
  18.                 System.out.println("延时任务触发");
  19.             }
  20.         }, delay, TimeUnit.SECONDS);
  21.     }
  22. }
复制代码

4.HashedWheelTimer的运行流程


步调一:初始化时间轮
步调二:启动时间轮
步调三:添加使命,保存到延时使命队列
步调四:时间轮指针休眠阻塞,实现转动
步调五:休眠结束,指针指向下一个时间格(槽)
步调六:将已经取消的使命从对应的槽中移除
步调七:将延时使命队列的使命添加到对应的槽中
步调八:实行时间轮指针指向当前槽的到期使命





5.HashedWheelTimer中的关键字段


字段一:wheel
wheel是一个HashedWheelBucket数组,默认的数组大小是512。可以认为wheel是一个TimerTask的哈希表,它的哈希函数是使命的截止日期。所以每个时间轮的时间格数ticksPerWheel默认是512。

字段二:tickDuration
时间格跨度,默认100ms。

字段三:ticksPerWheel
时间轮的格子数,默认512。

字段四:maxPendingTimeouts
时间轮中使命的最大数目。

字段五:deadline
延时使命的截止时间,值为当前时间 + 延时使命的延时时间 - 时间轮启动时间。

字段六:tick
时间轮启动以来指针总的转动次数。

字段七:remainingRounds
槽中延时使命剩余的圈(轮)数,为0时则表示需要实行延时使命了。

6.HashedWheelTimer的构造方法


步调一:构造参数校验及给现实实行延时使命的线程池taskExecutor赋值

步调二:将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂

步调三:初始化HashedWheelBucket数组wheel

步调四:校验tickDuration和ticksPerWheel

步调五:创建工作线程workerThread,用于指针转动和触发实行时间格里的延时使命

步调六:给时间轮中延时使命的最大数目maxPendingTimeouts赋值

步调七:查抄HashedWheelTimer的实例数目,如果大于64则打印error日志




  1. //4.1.73.Final
  2. public class HashedWheelTimer implements Timer {
  3.     private final HashedWheelBucket[] wheel;
  4.     private final int mask;
  5.     private final long tickDuration;
  6.     private final Thread workerThread;
  7.     private final ResourceLeakTracker<HashedWheelTimer> leak;
  8.     private final Worker worker = new Worker();
  9.     private final long maxPendingTimeouts;
  10.     private final Executor taskExecutor;
  11.     ...
  12.     //Creates a new timer.
  13.     //@param threadFactory        创建线程的工厂
  14.     //@param tickDuration         每格的时间间隔,默认100ms,0.1秒
  15.     //@param unit                 时间单位,默认为毫秒
  16.     //@param ticksPerWheel        时间轮的格子数,默认为512;如果传入的不是2的N次方,则会调整为大于等于该参数的第一个2的N次方,好处是可以优化hash值的计算
  17.     //@param leakDetection        如果false,那么只有工作线程不是后台线程时才会追踪资源泄露,这个参数可以忽略
  18.     //@param maxPendingTimeouts   最大的pending数量(时间轮中任务的最大数量),超过这个值之后调用将抛出异常,0或者负数表示没有限制,默认为-1
  19.     //@param taskExecutor         任务线程池,用于执行提交的任务,调用者负责在不需要时关闭它
  20.     //@throws NullPointerException     if either of threadFactory and unit is null
  21.     //@throws IllegalArgumentException if either of tickDuration and ticksPerWheel is <= 0
  22.     public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) {
  23.         //1.构造参数校验及给实际执行延时任务的线程池taskExecutor赋值
  24.         checkNotNull(threadFactory, "threadFactory");
  25.         checkNotNull(unit, "unit");
  26.         checkPositive(tickDuration, "tickDuration");
  27.         checkPositive(ticksPerWheel, "ticksPerWheel");
  28.         this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
  29.         //2.将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂,方便进行求商和取余计算
  30.         //3.初始化时间轮wheel
  31.         wheel = createWheel(ticksPerWheel);
  32.         //mask的设计和HashMap一样,通过限制数组的大小为2的幂,利用位运算来替代取模运算,提高性能
  33.         mask = wheel.length - 1;
  34.       
  35.         //4.校验tickDuration和ticksPerWheel
  36.         //Convert tickDuration to nanos.
  37.         long duration = unit.toNanos(tickDuration);
  38.         //防止溢出
  39.         //tickDuration * ticksPerWheel必须小于Long.MAX_VALUE
  40.         if (duration >= Long.MAX_VALUE / wheel.length) {
  41.             throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length));
  42.         }
  43.         //tickDuration不能小于1ms
  44.         if (duration < MILLISECOND_NANOS) {
  45.             logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);
  46.             this.tickDuration = MILLISECOND_NANOS;
  47.         } else {
  48.             this.tickDuration = duration;
  49.         }
  50.         //5.创建工作线程,用于指针转动和触发时间格里的延时任务的执行
  51.         workerThread = threadFactory.newThread(worker);
  52.         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
  53.         //6.给时间轮中任务的最大数量maxPendingTimeouts赋值
  54.         this.maxPendingTimeouts = maxPendingTimeouts;
  55.         //7.检查HashedWheelTimer的实例数量,如果大于64则打印error日志
  56.         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
  57.             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
  58.             reportTooManyInstances();
  59.         }
  60.     }
  61.    
  62.     //初始化时间轮环形数组
  63.     //@param ticksPerWheel
  64.     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
  65.         //ticksPerWheel不能大于2^30
  66.         checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
  67.         //将ticksPerWheel(轮子上的时间格数)向上取值为2的次幂
  68.         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
  69.         //创建时间轮环形数组
  70.         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
  71.         for (int i = 0; i < wheel.length; i ++) {
  72.             wheel[i] = new HashedWheelBucket();
  73.         }
  74.         return wheel;
  75.     }
  76.     ...
  77. }
复制代码

7.HashedWheelTimer添加使命和实行使命


(1)添加延时使命


步调一:将需要实行的延时使命数pendingTimeouts+1

步调二:如果pendingTimeouts超过maxPendingTimeouts,则抛出异常

步调三:启动工作线程,也就是启动时间轮

步调四:计算被添加的延时使命的截止时间=当前时间+当前使命实行的延迟时间-时间轮启动的时间

步调五:创建延时使命实例HashedWheelTimeout

步调六:将延时使命实例添加到延时使命队列timeouts中




注意:添加时会将延时使命添加到延时使命队列timeouts中。这个延时使命队列timeouts将会在下一个滴答声中进行处理惩罚(指针的下一次转动)。在处理惩罚过程中,全部排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket。

  1. public class HashedWheelTimer implements Timer {
  2.     private final AtomicLong pendingTimeouts = new AtomicLong(0);//需要执行的延时任务数
  3.     private final long maxPendingTimeouts;
  4.     private volatile long startTime;
  5.     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
  6.     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();//延时任务队列
  7.     ...
  8.     //添加延时任务
  9.     //@param task 任务
  10.     //@param delay 延时时间
  11.     //@param unit 延时时间单位
  12.     @Override
  13.     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
  14.         checkNotNull(task, "task");
  15.         checkNotNull(unit, "unit");
  16.         //1.将需要执行的延时任务数pendingTimeouts + 1
  17.         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
  18.       
  19.         //2.如果pendingTimeouts超过maxPendingTimeouts,则抛出异常
  20.         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
  21.             pendingTimeouts.decrementAndGet();
  22.             throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
  23.                 + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")");
  24.         }
  25.       
  26.         //3.启动工作线程,即启动时间轮
  27.         start();
  28.      
  29.         //将延时任务添加到延时任务队列timeouts中,该队列将在下一个滴答声中处理(指针的下一次转动)
  30.         //在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket
  31.         //4.计算任务的截止时间deadline = 当前时间 + 当前任务执行的延迟时间 - 时间轮启动的时间
  32.         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
  33.         if (delay > 0 && deadline < 0) {
  34.             deadline = Long.MAX_VALUE;
  35.         }
  36.       
  37.         //5.创建延时任务实例HashedWheelTimeout
  38.         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
  39.       
  40.         //6.将延时任务实例添加到延时任务队列中
  41.         timeouts.add(timeout);
  42.         return timeout;
  43.     }
  44.    
  45.     //Starts the background thread explicitly.  
  46.     //The background thread will start automatically on demand even if you did not call this method.
  47.     //@throws IllegalStateException if this timer has been #stop() stopped already
  48.     public void start() {
  49.         switch (WORKER_STATE_UPDATER.get(this)) {
  50.         case WORKER_STATE_INIT:
  51.             if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
  52.                 //启动工作线程,即启动时间轮
  53.                 workerThread.start();
  54.             }
  55.             break;
  56.         case WORKER_STATE_STARTED:
  57.             break;
  58.         case WORKER_STATE_SHUTDOWN:
  59.             throw new IllegalStateException("cannot be started once stopped");
  60.         default:
  61.             throw new Error("Invalid WorkerState");
  62.         }
  63.         //Wait until the startTime is initialized by the worker.
  64.         while (startTime == 0) {
  65.             try {
  66.                 //阻塞时间轮的工作线程
  67.                 startTimeInitialized.await();
  68.             } catch (InterruptedException ignore) {
  69.                 //Ignore - it will be ready very soon.
  70.             }
  71.         }
  72.     }
  73.     ...
  74. }
复制代码

(2)实行延时使命


步调一:记载时间轮启动的时间startTime

步调二:开始do while循环,唤醒被阻塞的start()方法,通知时间轮已经启动完毕

步调三:阻塞等待下一次指针转动的时间

步调四:计算当前指针指向的时间轮的槽位idx

步调五:将已经取消的使命从HashedWheelBucket数组中移除,并将pendingTimeouts使命数 - 1

步调六:获取当前指针指向的时间槽HashedWheelBucket

步调七:遍历延时使命队列timeouts,将此中的延时使命保存到对应的槽的链表中,根据延时时间计算对应的时间槽和remainingRounds圈数

步调八:运行现在指针指向的时间槽中的链表的使命,通过taskExecutor线程池去实行到期的使命

步调九:到期的和取消的延时使命从链表中移除并将pendingTimeouts--

步调十:时间轮指针的总转动次数tick++,继续do while循环

步调十一:扫除时间轮中不需要处理惩罚的使命,保存到unprocessedTimeouts中

步调十二:将延时使命队列中还未添加到时间轮的延时使命保存到unprocessedTimeouts中

步调十三:将已经取消的使命从HashedWheelBucket数组中移除,并将pendingTimeouts使命数 - 1




  1. public class HashedWheelTimer implements Timer {
  2.     private volatile long startTime;
  3.     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
  4.    
  5.     ...
  6.     //指针转动和执行延时任务的线程
  7.     private final class Worker implements Runnable {
  8.         //用于记录未执行的延时任务
  9.         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
  10.    
  11.         //总的tick数(指针嘀嗒的次数)
  12.         private long tick;
  13.    
  14.         @Override
  15.         public void run() {
  16.             //1.记录时间轮启动的时间startTime
  17.             startTime = System.nanoTime();
  18.             if (startTime == 0) {
  19.                 //我们在这里使用0作为未初始化值的指示符,所以要确保初始化时它不是0
  20.                 startTime = 1;
  21.             }
  22.    
  23.             //2.唤醒被阻塞的start()方法,通知时间轮已经启动完毕
  24.             startTimeInitialized.countDown();
  25.         
  26.             //一直执行do while循环,直到时间轮被关闭
  27.             do {
  28.                 //3.阻塞等待下一次指针转动的时间
  29.                 //这里会休眠tick的时间,模拟指针走动
  30.                 final long deadline = waitForNextTick();
  31.                 if (deadline > 0) {
  32.                     //4.计算当前指针指向的时间轮槽位idx
  33.                     int idx = (int) (tick & mask);
  34.                     //5.将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
  35.                     processCancelledTasks();
  36.                     //6.获取当前指针指向的时间槽HashedWheelBucket
  37.                     HashedWheelBucket bucket = wheel[idx];
  38.                     //7.遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中
  39.                     transferTimeoutsToBuckets();
  40.                     //8.运行目前指针指向的槽中的链表的任务,交给taskExecutor线程池去执行到期的延时任务
  41.                     //9.到期的和取消的延时任务从链表中移除并将pendingTimeouts--
  42.                     bucket.expireTimeouts(deadline);
  43.                     //10.时间轮指针的总转动次数tick++
  44.                     tick++;
  45.                 }
  46.             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
  47.    
  48.             //Fill the unprocessedTimeouts so we can return them from stop() method.
  49.             //11.清除时间轮中不需要处理的任务
  50.             for (HashedWheelBucket bucket: wheel) {
  51.                 bucket.clearTimeouts(unprocessedTimeouts);
  52.             }
  53.             //12.将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中
  54.             //遍历任务队列,如果发现有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中
  55.             for (;;) {
  56.                 HashedWheelTimeout timeout = timeouts.poll();
  57.                 if (timeout == null) {
  58.                     break;
  59.                 }
  60.                 if (!timeout.isCancelled()) {
  61.                     //如果延时任务没被取消,记录到未执行的任务Set集合中
  62.                     unprocessedTimeouts.add(timeout);
  63.                 }
  64.             }
  65.             //13.处理被取消的任务
  66.             processCancelledTasks();
  67.         }
  68.    
  69.         //将延时任务队列timeouts中等待添加到时间轮中的延时任务,转移到时间轮的指定位置
  70.         //也就是遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中
  71.         private void transferTimeoutsToBuckets() {
  72.             //每次转移10w个延时任务
  73.             for (int i = 0; i < 100000; i++) {
  74.                 //从队列中出队一个延时任务
  75.                 HashedWheelTimeout timeout = timeouts.poll();
  76.                 if (timeout == null) {
  77.                     //all processed
  78.                     break;
  79.                 }
  80.                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
  81.                     //Was cancelled in the meantime.
  82.                     continue;
  83.                 }
  84.                 //到期一共需要走多少时间格(tick次数),deadline表示当前任务的延迟时间(从时间轮启动时计算),tickDuration表示时间格的时间间隔
  85.                 long calculated = timeout.deadline / tickDuration;
  86.                 //tick已经走了的时间格,到期一共还需要需要走多少圈
  87.                 timeout.remainingRounds = (calculated - tick) / wheel.length;
  88.                 //如果延时任务在队列中等待太久已经过了执行时间,那么这个时候就使用当前tick,也就是放在当前的bucket,此方法调用完后就会被执行
  89.                 final long ticks = Math.max(calculated, tick);
  90.                 //槽的索引,stopIndex = tick 次数 & mask, mask = wheel.length - 1
  91.                 int stopIndex = (int) (ticks & mask);
  92.                 //根据索引该任务应该放到的槽
  93.                 HashedWheelBucket bucket = wheel[stopIndex];
  94.                 //将任务添加到槽中,链表末尾
  95.                 bucket.addTimeout(timeout);
  96.             }
  97.         }
  98.    
  99.         //处理取消掉的延时任务
  100.         //将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
  101.         private void processCancelledTasks() {
  102.             for (;;) {
  103.                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
  104.                 if (timeout == null) {
  105.                     //all processed
  106.                     break;
  107.                 }
  108.                 try {
  109.                     timeout.remove();
  110.                 } catch (Throwable t) {
  111.                     if (logger.isWarnEnabled()) {
  112.                         logger.warn("An exception was thrown while process a cancellation task", t);
  113.                     }
  114.                 }
  115.             }
  116.         }
  117.    
  118.         //从时间轮的启动时间startTime和当前的tick数(指针跳动次数)计算下一次指针跳动的时间,然后休眠等待下一次指针跳动时间到来
  119.         private long waitForNextTick() {
  120.             //deadline返回的是下一次时间轮指针跳动的时间与时间格启动的时间间隔
  121.             long deadline = tickDuration * (tick + 1);
  122.    
  123.             for (;;) {
  124.                 //计算当前时间距离启动时间的时间间隔
  125.                 final long currentTime = System.nanoTime() - startTime;
  126.                 //距离下一次指针跳动还需休眠多长时间
  127.                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
  128.                 //到了指针调到下一个槽位的时间
  129.                 if (sleepTimeMs <= 0) {
  130.                     if (currentTime == Long.MIN_VALUE) {
  131.                         return -Long.MAX_VALUE;
  132.                     } else {
  133.                         return currentTime;
  134.                     }
  135.                 }
  136.    
  137.                 try {
  138.                     //表示距离下一次指针跳动还需要一段时间,所以休眠等待时间的到来
  139.                     Thread.sleep(sleepTimeMs);
  140.                 } catch (InterruptedException ignored) {
  141.                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
  142.                         return Long.MIN_VALUE;
  143.                     }
  144.                 }
  145.             }
  146.         }
  147.    
  148.         //记录未执行的延时任务
  149.         public Set<Timeout> unprocessedTimeouts() {
  150.             return Collections.unmodifiableSet(unprocessedTimeouts);
  151.         }
  152.     }
  153.    
  154.     private static final class HashedWheelBucket {
  155.         ...
  156.         public void expireTimeouts(long deadline) {
  157.             HashedWheelTimeout timeout = head;
  158.         
  159.             //process all timeouts
  160.             while (timeout != null) {
  161.                 HashedWheelTimeout next = timeout.next;
  162.                 if (timeout.remainingRounds <= 0) {
  163.                     next = remove(timeout);
  164.                     if (timeout.deadline <= deadline) {
  165.                         //通过线程池执行任务
  166.                         timeout.expire();
  167.                     } else {
  168.                         //The timeout was placed into a wrong slot. This should never happen.
  169.                         throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
  170.                     }
  171.                 } else if (timeout.isCancelled()) {
  172.                     next = remove(timeout);
  173.                 } else {
  174.                     timeout.remainingRounds --;
  175.                 }
  176.                 timeout = next;
  177.             }
  178.         }
  179.         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
  180.             HashedWheelTimeout next = timeout.next;
  181.             //remove timeout that was either processed or cancelled by updating the linked-list
  182.             if (timeout.prev != null) {
  183.                 timeout.prev.next = next;
  184.             }
  185.             if (timeout.next != null) {
  186.                 timeout.next.prev = timeout.prev;
  187.             }
  188.             if (timeout == head) {
  189.                 //if timeout is also the tail we need to adjust the entry too
  190.                 if (timeout == tail) {
  191.                     tail = null;
  192.                     head = null;
  193.                 } else {
  194.                     head = next;
  195.                 }
  196.             } else if (timeout == tail) {
  197.                 //if the timeout is the tail modify the tail to be the prev node.
  198.                 tail = timeout.prev;
  199.             }
  200.             //null out prev, next and bucket to allow for GC.
  201.             timeout.prev = null;
  202.             timeout.next = null;
  203.             timeout.bucket = null;
  204.             timeout.timer.pendingTimeouts.decrementAndGet();
  205.             return next;
  206.         }
  207.         ...   
  208.     }
  209.    
  210.     private static final class HashedWheelTimeout implements Timeout, Runnable {
  211.         private final TimerTask task;
  212.         private final HashedWheelTimer timer;
  213.         ...
  214.         public void expire() {
  215.             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
  216.                 return;
  217.             }
  218.             try {
  219.                 timer.taskExecutor.execute(this);
  220.             } catch (Throwable t) {
  221.                 if (logger.isWarnEnabled()) {
  222.                     logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName() + " for execution.", t);
  223.                 }
  224.             }
  225.         }
  226.         ...
  227.     }
  228.     ...
  229. }
复制代码

8.HashedWheelTimer的完整源码


  1. //Netty时间轮
  2. public class HashedWheelTimer implements Timer {
  3.     static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class);
  4.     private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
  5.     private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
  6.     private static final int INSTANCE_COUNT_LIMIT = 64;
  7.     private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
  8.     private static final ResourceLeakDetector<HashedWheelTimer> leakDetector =
  9.         ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1);
  10.     private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
  11.         AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
  12.     private final ResourceLeakTracker<HashedWheelTimer> leak;
  13.     //指针转动和延时任务执行的线程
  14.     private final Worker worker = new Worker();
  15.     //worker任务封装的工作线程,用于指针转动和触发时间格里的延时任务的执行
  16.     private final Thread workerThread;
  17.     public static final int WORKER_STATE_INIT = 0;
  18.     public static final int WORKER_STATE_STARTED = 1;
  19.     public static final int WORKER_STATE_SHUTDOWN = 2;
  20.     @SuppressWarnings({"unused", "FieldMayBeFinal"})
  21.     private volatile int workerState;//0 - init, 1 - started, 2 - shut down
  22.     //每个时间格的时间跨度,默认为100ms
  23.     private final long tickDuration;
  24.     //时间轮(环形数组),HashedWheelBucket为每个时间格的槽
  25.     private final HashedWheelBucket[] wheel;
  26.     private final int mask;
  27.     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
  28.     //延时任务队列,队列中为等待被添加到时间轮的延时任务
  29.     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
  30.     //保存已经取消的延时任务的队列
  31.     private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
  32.     //记录当前的任务数
  33.     private final AtomicLong pendingTimeouts = new AtomicLong(0);
  34.     //最大的任务数
  35.     private final long maxPendingTimeouts;
  36.     //执行延时任务的线程池
  37.     private final Executor taskExecutor;
  38.     //工作线程启动时间
  39.     private volatile long startTime;
  40.     // 构造器 start //
  41.     public HashedWheelTimer() {
  42.         this(Executors.defaultThreadFactory());
  43.     }
  44.     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
  45.         this(Executors.defaultThreadFactory(), tickDuration, unit);
  46.     }
  47.     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
  48.         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
  49.     }
  50.     //使用默认的tickDuration(时间格跨度默认为100ms)和默认的ticksPerWheel(时间格总数默认为512)创建一个新的计时器(时间轮)
  51.     public HashedWheelTimer(ThreadFactory threadFactory) {
  52.         this(threadFactory, 100, TimeUnit.MILLISECONDS);
  53.     }
  54.     public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
  55.         this(threadFactory, tickDuration, unit, 512);
  56.     }
  57.     public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
  58.         this(threadFactory, tickDuration, unit, ticksPerWheel, true);
  59.     }
  60.     public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
  61.         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
  62.     }
  63.     public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
  64.         this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, maxPendingTimeouts, ImmediateExecutor.INSTANCE);
  65.     }
  66.     //Creates a new timer.
  67.     //@param threadFactory        创建线程的工厂
  68.     //@param tickDuration         每格的时间间隔,默认100ms,0.1秒
  69.     //@param unit                 时间单位,默认为毫秒
  70.     //@param ticksPerWheel        时间轮的格子数,默认为512;如果传入的不是2的N次方,则会调整为大于等于该参数的第一个2的N次方,好处是可以优化hash值的计算
  71.     //@param leakDetection        如果false,那么只有工作线程不是后台线程时才会追踪资源泄露,这个参数可以忽略
  72.     //@param maxPendingTimeouts   最大的pending数量(时间轮中任务的最大数量),超过这个值之后调用将抛出异常,0或者负数表示没有限制,默认为-1
  73.     //@param taskExecutor         任务线程池,用于执行提交的任务,调用者负责在不需要时关闭它
  74.     //@throws NullPointerException     if either of threadFactory and unit is null
  75.     //@throws IllegalArgumentException if either of tickDuration and ticksPerWheel is <= 0
  76.     public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) {
  77.         //1.构造参数校验及给实际执行延时任务的线程池taskExecutor赋值
  78.         checkNotNull(threadFactory, "threadFactory");
  79.         checkNotNull(unit, "unit");
  80.         checkPositive(tickDuration, "tickDuration");
  81.         checkPositive(ticksPerWheel, "ticksPerWheel");
  82.         this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
  83.         //2.将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂,方便进行求商和取余计算
  84.         //3.初始化时间轮wheel
  85.         wheel = createWheel(ticksPerWheel);
  86.         //mask的设计和HashMap一样,通过限制数组的大小为2的幂,利用位运算来替代取模运算,提高性能
  87.         mask = wheel.length - 1;
  88.       
  89.         //4.校验tickDuration和ticksPerWheel
  90.         //Convert tickDuration to nanos.
  91.         long duration = unit.toNanos(tickDuration);
  92.         //防止溢出
  93.         //tickDuration * ticksPerWheel必须小于Long.MAX_VALUE
  94.         if (duration >= Long.MAX_VALUE / wheel.length) {
  95.             throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length));
  96.         }
  97.         //tickDuration不能小于1ms
  98.         if (duration < MILLISECOND_NANOS) {
  99.             logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);
  100.             this.tickDuration = MILLISECOND_NANOS;
  101.         } else {
  102.             this.tickDuration = duration;
  103.         }
  104.         //5.创建工作线程,用于指针转动和触发时间格里的延时任务的执行
  105.         workerThread = threadFactory.newThread(worker);
  106.         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
  107.         //6.给时间轮中任务的最大数量maxPendingTimeouts赋值
  108.         this.maxPendingTimeouts = maxPendingTimeouts;
  109.         //7.检查HashedWheelTimer的实例数量,如果大于64则打印error日志
  110.         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
  111.             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
  112.             reportTooManyInstances();
  113.         }
  114.     }
  115.     // 构造器 end //
  116.    
  117.     @Override
  118.     protected void finalize() throws Throwable {
  119.         try {
  120.             super.finalize();
  121.         } finally {
  122.             //This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown.
  123.             //If we have not yet shutdown then we want to make sure we decrement the active instance count.
  124.             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
  125.                 INSTANCE_COUNTER.decrementAndGet();
  126.             }
  127.         }
  128.     }
  129.     //初始化时间轮环形数组
  130.     //@param ticksPerWheel
  131.     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
  132.         //ticksPerWheel不能大于2^30
  133.         checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
  134.         //将ticksPerWheel(轮子上的时间格数)向上取值为2的次幂
  135.         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
  136.         //创建时间轮环形数组
  137.         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
  138.         for (int i = 0; i < wheel.length; i ++) {
  139.             wheel[i] = new HashedWheelBucket();
  140.         }
  141.         return wheel;
  142.     }
  143.     //将ticksPerWheel(时间轮上的时间格数)向上取值为2的次幂
  144.     private static int normalizeTicksPerWheel(int ticksPerWheel) {
  145.         int normalizedTicksPerWheel = 1;
  146.         while (normalizedTicksPerWheel < ticksPerWheel) {
  147.             normalizedTicksPerWheel <<= 1;
  148.         }
  149.         return normalizedTicksPerWheel;
  150.     }
  151.     //显式启动后台线程
  152.     //即使没有调用此方法,后台线程也会按需自动启动
  153.     //Starts the background thread explicitly.  
  154.     //The background thread will start automatically on demand even if you did not call this method.
  155.     //@throws IllegalStateException if this timer has been #stop() stopped already
  156.     public void start() {
  157.         switch (WORKER_STATE_UPDATER.get(this)) {
  158.         case WORKER_STATE_INIT:
  159.             if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
  160.                 //启动工作线程,即启动时间轮
  161.                 workerThread.start();
  162.             }
  163.             break;
  164.         case WORKER_STATE_STARTED:
  165.             break;
  166.         case WORKER_STATE_SHUTDOWN:
  167.             throw new IllegalStateException("cannot be started once stopped");
  168.         default:
  169.             throw new Error("Invalid WorkerState");
  170.         }
  171.         //Wait until the startTime is initialized by the worker.
  172.         while (startTime == 0) {
  173.             try {
  174.                 //阻塞时间轮的工作线程
  175.                 startTimeInitialized.await();
  176.             } catch (InterruptedException ignore) {
  177.                 //Ignore - it will be ready very soon.
  178.             }
  179.         }
  180.     }
  181.     @Override
  182.     public Set<Timeout> stop() {
  183.         if (Thread.currentThread() == workerThread) {
  184.             throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName());
  185.         }
  186.         if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
  187.             //workerState can be 0 or 2 at this moment - let it always be 2.
  188.             if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
  189.                 INSTANCE_COUNTER.decrementAndGet();
  190.                 if (leak != null) {
  191.                     boolean closed = leak.close(this);
  192.                     assert closed;
  193.                 }
  194.             }
  195.             return Collections.emptySet();
  196.         }
  197.         try {
  198.             boolean interrupted = false;
  199.             while (workerThread.isAlive()) {
  200.                 workerThread.interrupt();
  201.                 try {
  202.                     workerThread.join(100);
  203.                 } catch (InterruptedException ignored) {
  204.                     interrupted = true;
  205.                 }
  206.             }
  207.             if (interrupted) {
  208.                 Thread.currentThread().interrupt();
  209.             }
  210.         } finally {
  211.             INSTANCE_COUNTER.decrementAndGet();
  212.             if (leak != null) {
  213.                 boolean closed = leak.close(this);
  214.                 assert closed;
  215.             }
  216.         }
  217.         return worker.unprocessedTimeouts();
  218.     }
  219.     //添加延时任务
  220.     //@param task 任务
  221.     //@param delay 延时时间
  222.     //@param unit 延时时间单位
  223.     @Override
  224.     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
  225.         checkNotNull(task, "task");
  226.         checkNotNull(unit, "unit");
  227.         //1.将需要执行的延时任务数pendingTimeouts + 1
  228.         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
  229.       
  230.         //2.如果pendingTimeouts超过maxPendingTimeouts,则抛出异常
  231.         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
  232.             pendingTimeouts.decrementAndGet();
  233.             throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
  234.                 + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")");
  235.         }
  236.       
  237.         //3.启动工作线程,即启动时间轮
  238.         start();
  239.      
  240.         //将延时任务添加到延时任务队列timeouts中,该队列将在下一个滴答声中处理(指针的下一次转动)
  241.         //在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket
  242.         //4.计算任务的截止时间deadline = 当前时间 + 当前任务执行的延迟时间 - 时间轮启动的时间
  243.         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
  244.         if (delay > 0 && deadline < 0) {
  245.             deadline = Long.MAX_VALUE;
  246.         }
  247.       
  248.         //5.创建延时任务实例HashedWheelTimeout
  249.         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
  250.       
  251.         //6.将延时任务实例添加到延时任务队列中
  252.         timeouts.add(timeout);
  253.         return timeout;
  254.     }
  255.     //Returns the number of pending timeouts of this Timer.
  256.     public long pendingTimeouts() {
  257.         return pendingTimeouts.get();
  258.     }
  259.     private static void reportTooManyInstances() {
  260.         if (logger.isErrorEnabled()) {
  261.             String resourceType = simpleClassName(HashedWheelTimer.class);
  262.             logger.error("You are creating too many " + resourceType + " instances. " +
  263.                 resourceType + " is a shared resource that must be reused across the JVM, " +
  264.                 "so that only a few instances are created.");
  265.         }
  266.     }
  267.     //指针转动和延时任务执行的线程
  268.     private final class Worker implements Runnable {
  269.         //用于记录未执行的延时任务
  270.         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
  271.    
  272.         //总的tick数(指针嘀嗒的次数)
  273.         private long tick;
  274.    
  275.         @Override
  276.         public void run() {
  277.             //1.记录时间轮启动的时间startTime
  278.             startTime = System.nanoTime();
  279.             if (startTime == 0) {
  280.                 //我们在这里使用0作为未初始化值的指示符,所以要确保初始化时它不是0
  281.                 startTime = 1;
  282.             }
  283.    
  284.             //2.唤醒被阻塞的start()方法,通知时间轮已经启动完毕
  285.             startTimeInitialized.countDown();
  286.         
  287.             //一直执行do while循环,直到时间轮被关闭
  288.             do {
  289.                 //3.阻塞等待下一次指针转动的时间
  290.                 //这里会休眠tick的时间,模拟指针走动
  291.                 final long deadline = waitForNextTick();
  292.                 if (deadline > 0) {
  293.                     //4.计算当前指针指向的时间轮槽位idx
  294.                     int idx = (int) (tick & mask);
  295.                     //5.将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
  296.                     processCancelledTasks();
  297.                     //6.获取当前指针指向的时间槽HashedWheelBucket
  298.                     HashedWheelBucket bucket = wheel[idx];
  299.                     //7.遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中
  300.                     transferTimeoutsToBuckets();
  301.                     //8.运行目前指针指向的槽中的链表的任务,交给taskExecutor线程池去执行到期的延时任务
  302.                     //9.到期的和取消的延时任务从链表中移除并将pendingTimeouts--
  303.                     bucket.expireTimeouts(deadline);
  304.                     //10.时间轮指针的总转动次数tick++
  305.                     tick++;
  306.                 }
  307.             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
  308.    
  309.             //Fill the unprocessedTimeouts so we can return them from stop() method.
  310.             //11.清除时间轮中不需要处理的任务
  311.             for (HashedWheelBucket bucket: wheel) {
  312.                 bucket.clearTimeouts(unprocessedTimeouts);
  313.             }
  314.             //12.将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中
  315.             //遍历任务队列,如果发现有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中
  316.             for (;;) {
  317.                 HashedWheelTimeout timeout = timeouts.poll();
  318.                 if (timeout == null) {
  319.                     break;
  320.                 }
  321.                 if (!timeout.isCancelled()) {
  322.                     //如果延时任务没被取消,记录到未执行的任务Set集合中
  323.                     unprocessedTimeouts.add(timeout);
  324.                 }
  325.             }
  326.             //13.处理被取消的任务
  327.             processCancelledTasks();
  328.         }
  329.    
  330.         //将延时任务队列timeouts中等待添加到时间轮中的延时任务,转移到时间轮的指定位置
  331.         //也就是遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中
  332.         private void transferTimeoutsToBuckets() {
  333.             //每次转移10w个延时任务
  334.             for (int i = 0; i < 100000; i++) {
  335.                 //从队列中出队一个延时任务
  336.                 HashedWheelTimeout timeout = timeouts.poll();
  337.                 if (timeout == null) {
  338.                     //all processed
  339.                     break;
  340.                 }
  341.                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
  342.                     //Was cancelled in the meantime.
  343.                     continue;
  344.                 }
  345.                 //到期一共需要走多少时间格(tick次数),deadline表示当前任务的延迟时间(从时间轮启动时计算),tickDuration表示时间格的时间间隔
  346.                 long calculated = timeout.deadline / tickDuration;
  347.                 //tick已经走了的时间格,到期一共还需要需要走多少圈
  348.                 timeout.remainingRounds = (calculated - tick) / wheel.length;
  349.                 //如果延时任务在队列中等待太久已经过了执行时间,那么这个时候就使用当前tick,也就是放在当前的bucket,此方法调用完后就会被执行
  350.                 final long ticks = Math.max(calculated, tick);
  351.                 //槽的索引,stopIndex = tick 次数 & mask, mask = wheel.length - 1
  352.                 int stopIndex = (int) (ticks & mask);
  353.                 //根据索引该任务应该放到的槽
  354.                 HashedWheelBucket bucket = wheel[stopIndex];
  355.                 //将任务添加到槽中,链表末尾
  356.                 bucket.addTimeout(timeout);
  357.             }
  358.         }
  359.    
  360.         //处理取消掉的延时任务
  361.         //将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
  362.         private void processCancelledTasks() {
  363.             for (;;) {
  364.                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
  365.                 if (timeout == null) {
  366.                     //all processed
  367.                     break;
  368.                 }
  369.                 try {
  370.                     timeout.remove();
  371.                 } catch (Throwable t) {
  372.                     if (logger.isWarnEnabled()) {
  373.                         logger.warn("An exception was thrown while process a cancellation task", t);
  374.                     }
  375.                 }
  376.             }
  377.         }
  378.    
  379.         //从时间轮的启动时间startTime和当前的tick数(指针跳动次数)计算下一次指针跳动的时间,然后休眠等待下一次指针跳动时间到来
  380.         private long waitForNextTick() {
  381.             //deadline返回的是下一次时间轮指针跳动的时间与时间格启动的时间间隔
  382.             long deadline = tickDuration * (tick + 1);
  383.    
  384.             for (;;) {
  385.                 //计算当前时间距离启动时间的时间间隔
  386.                 final long currentTime = System.nanoTime() - startTime;
  387.                 //距离下一次指针跳动还需休眠多长时间
  388.                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
  389.                 //到了指针调到下一个槽位的时间
  390.                 if (sleepTimeMs <= 0) {
  391.                     if (currentTime == Long.MIN_VALUE) {
  392.                         return -Long.MAX_VALUE;
  393.                     } else {
  394.                         return currentTime;
  395.                     }
  396.                 }
  397.    
  398.                 try {
  399.                     //表示距离下一次指针跳动还需要一段时间,所以休眠等待时间的到来
  400.                     Thread.sleep(sleepTimeMs);
  401.                 } catch (InterruptedException ignored) {
  402.                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
  403.                         return Long.MIN_VALUE;
  404.                     }
  405.                 }
  406.             }
  407.         }
  408.    
  409.         //记录未执行的延时任务
  410.         public Set<Timeout> unprocessedTimeouts() {
  411.             return Collections.unmodifiableSet(unprocessedTimeouts);
  412.         }
  413.     }
  414.    
  415.     //延时任务
  416.     private static final class HashedWheelTimeout implements Timeout, Runnable {
  417.         private static final int ST_INIT = 0;
  418.         private static final int ST_CANCELLED = 1;
  419.         private static final int ST_EXPIRED = 2;
  420.         private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
  421.             AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
  422.         private final HashedWheelTimer timer;
  423.         private final TimerTask task;
  424.         //任务执行的截止时间 = 当前时间 + 延时任务延时时间 - 时间轮启动时间
  425.         private final long deadline;
  426.         @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
  427.         private volatile int state = ST_INIT;
  428.         //剩下的圈(轮)数
  429.         //remainingRounds将由Worker.transferTimeoutsToBuckets()在HashedWheelTimeout被添加到正确的HashedWheelBucket之前计算和设置
  430.         long remainingRounds;
  431.         //HashedWheelTimerBucket槽中的延时任务列表是一个双向链表
  432.         //因为只有workerThread会对它进行操作,所以不需要 synchronization / volatile
  433.         HashedWheelTimeout next;
  434.         HashedWheelTimeout prev;
  435.         //当前延时任务所插入时间轮的哪个槽
  436.         HashedWheelBucket bucket;
  437.         HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
  438.             this.timer = timer;
  439.             this.task = task;
  440.             this.deadline = deadline;
  441.         }
  442.         @Override
  443.         public Timer timer() {
  444.             return timer;
  445.         }
  446.         @Override
  447.         public TimerTask task() {
  448.             return task;
  449.         }
  450.         @Override
  451.         public boolean cancel() {
  452.             //only update the state it will be removed from HashedWheelBucket on next tick.
  453.             if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
  454.                 return false;
  455.             }
  456.             //If a task should be canceled we put this to another queue which will be processed on each tick.
  457.             //So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
  458.             //we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
  459.             timer.cancelledTimeouts.add(this);
  460.             return true;
  461.         }
  462.         void remove() {
  463.             HashedWheelBucket bucket = this.bucket;
  464.             if (bucket != null) {
  465.                 bucket.remove(this);
  466.             } else {
  467.                 timer.pendingTimeouts.decrementAndGet();
  468.             }
  469.         }
  470.         public boolean compareAndSetState(int expected, int state) {
  471.             return STATE_UPDATER.compareAndSet(this, expected, state);
  472.         }
  473.         public int state() {
  474.             return state;
  475.         }
  476.         @Override
  477.         public boolean isCancelled() {
  478.             return state() == ST_CANCELLED;
  479.         }
  480.         @Override
  481.         public boolean isExpired() {
  482.             return state() == ST_EXPIRED;
  483.         }
  484.         public void expire() {
  485.             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
  486.                 return;
  487.             }
  488.             try {
  489.                 timer.taskExecutor.execute(this);
  490.             } catch (Throwable t) {
  491.                 if (logger.isWarnEnabled()) {
  492.                     logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName() + " for execution.", t);
  493.                 }
  494.             }
  495.         }
  496.         @Override
  497.         public void run() {
  498.             try {
  499.                 task.run(this);
  500.             } catch (Throwable t) {
  501.                 if (logger.isWarnEnabled()) {
  502.                     logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
  503.                 }
  504.             }
  505.         }
  506.         @Override
  507.         public String toString() {
  508.             final long currentTime = System.nanoTime();
  509.             long remaining = deadline - currentTime + timer.startTime;
  510.             StringBuilder buf = new StringBuilder(192).append(simpleClassName(this)).append('(').append("deadline: ");
  511.             if (remaining > 0) {
  512.                 buf.append(remaining).append(" ns later");
  513.             } else if (remaining < 0) {
  514.                 buf.append(-remaining).append(" ns ago");
  515.             } else {
  516.                 buf.append("now");
  517.             }
  518.             if (isCancelled()) {
  519.                 buf.append(", cancelled");
  520.             }
  521.             return buf.append(", task: ").append(task()).append(')').toString();
  522.         }
  523.     }
  524.     //存放HashedWheelTimeouts的桶
  525.     //这些数据存储在一个类似于链表的数据结构中,允许轻松删除中间的hashedwheeltimeout
  526.     //HashedWheelTimeout本身作为节点,因此不需要创建额外的对象
  527.     //保存头结点和尾节点,方便于任务的提取和插入
  528.     private static final class HashedWheelBucket {
  529.         //头结点
  530.         private HashedWheelTimeout head;
  531.         //尾节点
  532.         private HashedWheelTimeout tail;
  533.         //Add HashedWheelTimeout to this bucket.
  534.         public void addTimeout(HashedWheelTimeout timeout) {
  535.             assert timeout.bucket == null;
  536.             timeout.bucket = this;
  537.             if (head == null) {
  538.                 head = tail = timeout;
  539.             } else {
  540.                 tail.next = timeout;
  541.                 timeout.prev = tail;
  542.                 tail = timeout;
  543.             }
  544.         }
  545.         //Expire all HashedWheelTimeouts for the given deadline.
  546.         public void expireTimeouts(long deadline) {
  547.             HashedWheelTimeout timeout = head;
  548.             //遍历当前时间槽中的所有任务
  549.             while (timeout != null) {
  550.                 HashedWheelTimeout next = timeout.next;
  551.                 if (timeout.remainingRounds <= 0) {
  552.                     //从链表中移除
  553.                     next = remove(timeout);
  554.                     if (timeout.deadline <= deadline) {
  555.                         //延时任务到期,执行延时任务
  556.                         timeout.expire();
  557.                     } else {
  558.                         //The timeout was placed into a wrong slot. This should never happen.
  559.                         throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
  560.                     }
  561.                     //如果延时任务取消,从链表中移除
  562.                 } else if (timeout.isCancelled()) {
  563.                     next = remove(timeout);
  564.                 } else {
  565.                     //任务还没到期,剩余的轮数-1
  566.                     timeout.remainingRounds --;
  567.                 }
  568.                 //将指针放置到下一个延时任务上
  569.                 timeout = next;
  570.             }
  571.         }
  572.         //删除槽中链表中的延时任务
  573.         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
  574.             HashedWheelTimeout next = timeout.next;
  575.             //remove timeout that was either processed or cancelled by updating the linked-list
  576.             if (timeout.prev != null) {
  577.                 timeout.prev.next = next;
  578.             }
  579.             if (timeout.next != null) {
  580.                 timeout.next.prev = timeout.prev;
  581.             }
  582.             if (timeout == head) {
  583.                 //if timeout is also the tail we need to adjust the entry too
  584.                 if (timeout == tail) {
  585.                     tail = null;
  586.                     head = null;
  587.                 } else {
  588.                     head = next;
  589.                 }
  590.             } else if (timeout == tail) {
  591.                 //if the timeout is the tail modify the tail to be the prev node.
  592.                 tail = timeout.prev;
  593.             }
  594.             //null out prev, next and bucket to allow for GC.
  595.             timeout.prev = null;
  596.             timeout.next = null;
  597.             timeout.bucket = null;
  598.             timeout.timer.pendingTimeouts.decrementAndGet();
  599.             return next;
  600.         }
  601.         //Clear this bucket and return all not expired / cancelled Timeouts.
  602.         public void clearTimeouts(Set<Timeout> set) {
  603.             for (;;) {
  604.                 HashedWheelTimeout timeout = pollTimeout();
  605.                 if (timeout == null) {
  606.                     return;
  607.                 }
  608.                 if (timeout.isExpired() || timeout.isCancelled()) {
  609.                     continue;
  610.                 }
  611.                 set.add(timeout);
  612.             }
  613.         }
  614.         //头结点移除
  615.         private HashedWheelTimeout pollTimeout() {
  616.             HashedWheelTimeout head = this.head;
  617.             if (head == null) {
  618.                 return null;
  619.             }
  620.             HashedWheelTimeout next = head.next;
  621.             if (next == null) {
  622.                 tail = this.head =  null;
  623.             } else {
  624.                 this.head = next;
  625.                 next.prev = null;
  626.             }
  627.             //null out prev and next to allow for GC.
  628.             head.next = null;
  629.             head.prev = null;
  630.             head.bucket = null;
  631.             return head;
  632.         }
  633.     }
  634. }
复制代码

9.HashedWheelTimer的总结


一.时间轮的转动是单线程
但是时间轮中每个时间槽里的延时使命则是由线程池来实行的。

二.延时使命保存到JVM中没有做宕机备份
系统重启时延时使命将会丢失,无法恢复使命进行重新调度。

三.时间轮调度器的时间精度不是很高
对于精度要求特殊高的调度使命可能不太适合,由于时间轮的精度取决于时间格的跨度大小。

四.时间轮指针的转动是使用Sleep来完成等待的

10.HashedWheelTimer的应用


(1)时间轮的应用场景


一.Dubbo、Netty、Kafka、Redission等中间件都用到了时间轮机制
二.订单关闭、确认收货、批量定时数据更新等都可以接纳时间轮机制

(2)心跳检测


心跳机制会每隔固定的时间发送一个心跳包来检测客户端和服务端的连接状态,客户端发送心跳包用来告诉服务器其还正常运行。

好比在Dubbo中,需要有心跳机制来维持Consumer与Provider的长连接,默认的心跳隔断是60s。当Provider在3次心跳时间内没有收到心跳相应,会关闭连接通道。当Consumer在3次心跳时间内没有收到心跳相应,会进行重连。

在Dubbo的HeaderExchangeClient类中会向时间轮中提交该心跳使命:

一.发送心跳的时间轮

  1. private static final HashedWheelTimer IDLE_CHECK_TIMER =
  2.     new HashedWheelTimer(
  3.         new NamedThreadFactory("dubbo-client-idleCheck", true),
  4.         1,
  5.         TimeUnit.SECONDS,
  6.         TICKS_PER_WHEEL
  7.     );
复制代码

二.向时间轮中提交心跳使命

  1. private void startHeartBeatTask(URL url) {
  2.     //Client的具体实现决定是否启动该心跳任务
  3.     if (!client.canHandleIdle()) {
  4.         AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
  5.         //计算心跳间隔, 最小间隔不能低于1s
  6.         int heartbeat = getHeartbeat(url);
  7.         long heartbeatTick = calculateLeastDuration(heartbeat);
  8.         //创建心跳任务
  9.         this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
  10.         //提交到IDLE_CHECK_TIMER这个时间轮中等待执行, 等时间到了时间轮就会去取出该任务进行调度执行
  11.         IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
  12.     }
  13. }
复制代码

(3)超时处理惩罚


在Dubbo中发起RPC调用时,通常会设置超时时间,当消耗者调用服务提供者出现超时进行一定的逻辑处理惩罚。那么怎么检测使命调用超时了呢?我们可以使用定时使命。

每次发起RPC调用时创建一个Future,记载这个Future的创建时间与超时时间,后台有一个定时使命进行检测。当Future到达超时时间而且没有被处理惩罚时,就需要对这个Future实行超时逻辑处理惩罚。

(4)Redisson分布式锁续期


Redisson看门狗机制,通过时间轮定时给分布式锁续期。在获取锁乐成后,Redisson会封装一个锁续期的延时使命放入到时间轮中。默认10秒查抄一下,用于对获取到的锁进行续期,延长持有锁的时间。如果业务机器宕机了,那么续期的延时使命失效,也无法续期,锁会超时开释。

一.添加续期延时使命

  1. private void renewExpiration() {
  2.     ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  3.     if (ee == null) {
  4.         return;
  5.     }
  6.     //这边newTimeout点进去发现就是往时间轮中提交了一个任务
  7.     Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
  8.         @Override
  9.         public void run(Timeout timeout) throws Exception {
  10.             ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  11.             if (ent == null) {
  12.                 return;
  13.             }
  14.             Long threadId = ent.getFirstThreadId();
  15.             if (threadId == null) {
  16.                 return;
  17.             }
  18.             RFuture<Boolean> future = renewExpirationAsync(threadId);
  19.             future.onComplete((res, e) -> {
  20.                 if (e != null) {
  21.                     log.error("Can't update lock " + getName() + " expiration", e);
  22.                     return;
  23.                 }
  24.                 if (res) {
  25.                     //续期成功后继续调度, 又往时间轮中放一个续期任务
  26.                     renewExpiration();
  27.                 }
  28.             });
  29.         }
  30.     }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
  31.     ee.setTimeout(task);
  32. }
复制代码

二.lua续期代码

  1. protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  2.     //通过lua脚本对锁进行续期
  3.     return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  4.         "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  5.         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  6.         "return 1; " +
  7.         "end; " +
  8.         "return 0;",
  9.         Collections.singletonList(getName()),
  10.         internalLockLeaseTime, getLockName(threadId)
  11.     );
  12. }
复制代码

   文章转载自:东阳马生架构
  原文链接:Netty源码—10.Netty工具之时间轮 - 东阳马生架构 - 博客园
  体验地点:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程计划器_表单引擎_工作流引擎_软件架构

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天津储鑫盛钢材现货供应商

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表