Netty源码—10.Netty工具之时间轮一

打印 上一主题 下一主题

主题 1721|帖子 1721|积分 5163

大纲
1.什么是时间轮
2.HashedWheelTimer是什么
3.HashedWheelTimer的使用
4.HashedWheelTimer的运行流程
5.HashedWheelTimer的核心字段
6.HashedWheelTimer的构造方法
7.HashedWheelTimer添加任务和实行任务
8.HashedWheelTimer的完备源码
9.HashedWheelTimer的总结
10.HashedWheelTimer的应用

1.什么是时间轮
简朴来说,时间轮是一个高效利用线程资源举行批量化调理的调理器。首先把大批量的调理任务全部绑定到同一个调理器上,然后使用这个调理器对所有任务举行管理、触发、以及运行,所以时间轮能高效管理各种延时任务、周期任务、关照任务。

时间轮是以时间作为刻度构成的一个环形队列,所以叫做时间轮。这个环形队列通过一个HashedWheelBucket[]数组来实现,数组的每个元素称为槽,每个槽可以存放一个定时任务列表,叫HashedWheelBucket。HashedWheelBucket是一个双向链表,链表的每个节点表示一个定时任务项HashedWheelTimeout。在HashedWheelTimeout中封装了真正的定时任务TimerTask。

时间轮由多个时间格构成,每个时间格代表当前时间轮的基本时间跨度ticketDuration,此中时间轮的时间格的个数是固定的。

如下图示,有16个时间格(槽),假设每个时间格的单位是1s,那么整个时间轮走完一圈需要16s。每秒钟(即时间格的单位也可以为1ms、1min、1h等)指针会沿着顺时针方向转动一格。通过指针移动来获得每个时间格中的任务列表,然后遍历这个时间格内的双向链表的每个任务并实行,依此循环。



2.HashedWheelTimer是什么
Netty的HashedWheelTimer是一个粗略的定时器实现,之所以称为粗略的实现是因为该时间轮并没有严酷准时地实行定时任务,而是在每隔一个时间间隔之后的时间节点实行,并实行当前时间节点之前到期的定时任务。

不过具体的定时任务的时间实行精度,可以通过调节HashedWheelTimer构造方法的时间间隔的巨细来举行调节。在大多数网络应用的情况下,由于IO耽误的存在,所以并不会严酷要求具体的时间实行精度。因此默认100ms的时间间隔可以满足大多数情况,不需要再花精力去调节该时间精度。

3.HashedWheelTimer的使用
  1. public class HashedWheelTimerTest {
  2.     //构建HashedWheelTimer时间轮
  3.     //最后通过HASHED_WHEEL_TIMER.newTimeout()方法把需要延迟执行的任务添加到时间轮中
  4.     private static final HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer(
  5.         new DefaultThreadFactory("demo-timer"),//threadFactory参数表示创建处理任务的线程工厂
  6.         100,//tickDuration参数表示每个时间格代表当前时间轮的基本时间跨度,这里是100ms,也就是指针100ms跳动一次,每次跳动一个窗格
  7.         TimeUnit.MILLISECONDS,
  8.         512,//ticksPerWheel参数表示时间轮上一共有多少个时间格,分配的时间格越多,占用内存空间就越大,这里是512
  9.         true//leakDetection参数表示是否开启内存泄漏检测
  10.     );
  11.     public static void main(String[] args) {
  12.         System.out.println("延时任务提交");
  13.         //延时多久执行
  14.         long delay = 10L;
  15.         HASHED_WHEEL_TIMER.newTimeout(new TimerTask() {
  16.             @Override
  17.             public void run(Timeout timeout) throws Exception {
  18.                 System.out.println("延时任务触发");
  19.             }
  20.         }, delay, TimeUnit.SECONDS);
  21.     }
  22. }
复制代码
4.HashedWheelTimer的运行流程
步调一:初始化时间轮
步调二:启动时间轮
步调三:添加任务,保存到延时任务队列
步调四:时间轮指针休眠阻塞,实现转动
步调五:休眠结束,指针指向下一个时间格(槽)
步调六:将已经取消的任务从对应的槽中移除
步调七:将延时任务队列的任务添加到对应的槽中
步调八:实行时间轮指针指向当前槽的到期任务



5.HashedWheelTimer中的关键字段
字段一:wheel
wheel是一个HashedWheelBucket数组,默认的数组巨细是512。可以认为wheel是一个TimerTask的哈希表,它的哈希函数是任务的截止日期。所以每个时间轮的时间格数ticksPerWheel默认是512。

字段二:tickDuration
时间格跨度,默认100ms。

字段三:ticksPerWheel
时间轮的格子数,默认512。

字段四:maxPendingTimeouts
时间轮中任务的最大数量。

字段五:deadline
延时任务的截止时间,值为当前时间 + 延时任务的延时时间 - 时间轮启动时间。

字段六:tick
时间轮启动以来指针总的转动次数。

字段七:remainingRounds
槽中延时任务剩余的圈(轮)数,为0时则表示需要实行延时任务了。

6.HashedWheelTimer的构造方法
步调一:构造参数校验及给实际实行延时任务的线程池taskExecutor赋值
步调二:将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂
步调三:初始化HashedWheelBucket数组wheel
步调四:校验tickDuration和ticksPerWheel
步调五:创建工作线程workerThread,用于指针转动和触发实行时间格里的延时任务
步调六:给时间轮中延时任务的最大数量maxPendingTimeouts赋值
步调七:检查HashedWheelTimer的实例数量,如果大于64则打印error日记


  1. //4.1.73.Final
  2. public class HashedWheelTimer implements Timer {
  3.     private final HashedWheelBucket[] wheel;
  4.     private final int mask;
  5.     private final long tickDuration;
  6.     private final Thread workerThread;
  7.     private final ResourceLeakTracker<HashedWheelTimer> leak;
  8.     private final Worker worker = new Worker();
  9.     private final long maxPendingTimeouts;
  10.     private final Executor taskExecutor;
  11.     ...
  12.     //Creates a new timer.
  13.     //@param threadFactory        创建线程的工厂
  14.     //@param tickDuration         每格的时间间隔,默认100ms,0.1秒
  15.     //@param unit                 时间单位,默认为毫秒
  16.     //@param ticksPerWheel        时间轮的格子数,默认为512;如果传入的不是2的N次方,则会调整为大于等于该参数的第一个2的N次方,好处是可以优化hash值的计算
  17.     //@param leakDetection        如果false,那么只有工作线程不是后台线程时才会追踪资源泄露,这个参数可以忽略
  18.     //@param maxPendingTimeouts   最大的pending数量(时间轮中任务的最大数量),超过这个值之后调用将抛出异常,0或者负数表示没有限制,默认为-1
  19.     //@param taskExecutor         任务线程池,用于执行提交的任务,调用者负责在不需要时关闭它
  20.     //@throws NullPointerException     if either of threadFactory and unit is null
  21.     //@throws IllegalArgumentException if either of tickDuration and ticksPerWheel is <= 0
  22.     public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) {
  23.         //1.构造参数校验及给实际执行延时任务的线程池taskExecutor赋值
  24.         checkNotNull(threadFactory, "threadFactory");
  25.         checkNotNull(unit, "unit");
  26.         checkPositive(tickDuration, "tickDuration");
  27.         checkPositive(ticksPerWheel, "ticksPerWheel");
  28.         this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
  29.         //2.将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂,方便进行求商和取余计算
  30.         //3.初始化时间轮wheel
  31.         wheel = createWheel(ticksPerWheel);
  32.         //mask的设计和HashMap一样,通过限制数组的大小为2的幂,利用位运算来替代取模运算,提高性能
  33.         mask = wheel.length - 1;
  34.       
  35.         //4.校验tickDuration和ticksPerWheel
  36.         //Convert tickDuration to nanos.
  37.         long duration = unit.toNanos(tickDuration);
  38.         //防止溢出
  39.         //tickDuration * ticksPerWheel必须小于Long.MAX_VALUE
  40.         if (duration >= Long.MAX_VALUE / wheel.length) {
  41.             throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length));
  42.         }
  43.         //tickDuration不能小于1ms
  44.         if (duration < MILLISECOND_NANOS) {
  45.             logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);
  46.             this.tickDuration = MILLISECOND_NANOS;
  47.         } else {
  48.             this.tickDuration = duration;
  49.         }
  50.         //5.创建工作线程,用于指针转动和触发时间格里的延时任务的执行
  51.         workerThread = threadFactory.newThread(worker);
  52.         leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
  53.         //6.给时间轮中任务的最大数量maxPendingTimeouts赋值
  54.         this.maxPendingTimeouts = maxPendingTimeouts;
  55.         //7.检查HashedWheelTimer的实例数量,如果大于64则打印error日志
  56.         if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
  57.             WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
  58.             reportTooManyInstances();
  59.         }
  60.     }
  61.    
  62.     //初始化时间轮环形数组
  63.     //@param ticksPerWheel
  64.     private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
  65.         //ticksPerWheel不能大于2^30
  66.         checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");
  67.         //将ticksPerWheel(轮子上的时间格数)向上取值为2的次幂
  68.         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
  69.         //创建时间轮环形数组
  70.         HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
  71.         for (int i = 0; i < wheel.length; i ++) {
  72.             wheel[i] = new HashedWheelBucket();
  73.         }
  74.         return wheel;
  75.     }
  76.     ...
  77. }
复制代码

7.HashedWheelTimer添加任务和实行任务
(1)添加延时任务
(2)实行延时任务

(1)添加延时任务
步调一:将需要实行的延时任务数pendingTimeouts+1
步调二:如果pendingTimeouts超过maxPendingTimeouts,则抛出异常
步调三:启动工作线程,也就是启动时间轮
步调四:盘算被添加的延时任务的截止时间=当前时间+当前任务实行的耽误时间-时间轮启动的时间
步调五:创建延时任务实例HashedWheelTimeout
步调六:将延时任务实例添加到延时任务队列timeouts中


注意:添加时会将延时任务添加到延时任务队列timeouts中。这个延时任务队列timeouts将会在下一个滴答声中举行处置惩罚(指针的下一次转动)。在处置惩罚过程中,所有列队的HashedWheelTimeout将被添加到正确的HashedWheelBucket。
  1. public class HashedWheelTimer implements Timer {
  2.     private final AtomicLong pendingTimeouts = new AtomicLong(0);//需要执行的延时任务数
  3.     private final long maxPendingTimeouts;
  4.     private volatile long startTime;
  5.     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
  6.     private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();//延时任务队列
  7.     ...
  8.     //添加延时任务
  9.     //@param task 任务
  10.     //@param delay 延时时间
  11.     //@param unit 延时时间单位
  12.     @Override
  13.     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
  14.         checkNotNull(task, "task");
  15.         checkNotNull(unit, "unit");
  16.         //1.将需要执行的延时任务数pendingTimeouts + 1
  17.         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
  18.       
  19.         //2.如果pendingTimeouts超过maxPendingTimeouts,则抛出异常
  20.         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
  21.             pendingTimeouts.decrementAndGet();
  22.             throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
  23.                 + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")");
  24.         }
  25.       
  26.         //3.启动工作线程,即启动时间轮
  27.         start();
  28.      
  29.         //将延时任务添加到延时任务队列timeouts中,该队列将在下一个滴答声中处理(指针的下一次转动)
  30.         //在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket
  31.         //4.计算任务的截止时间deadline = 当前时间 + 当前任务执行的延迟时间 - 时间轮启动的时间
  32.         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
  33.         if (delay > 0 && deadline < 0) {
  34.             deadline = Long.MAX_VALUE;
  35.         }
  36.       
  37.         //5.创建延时任务实例HashedWheelTimeout
  38.         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
  39.       
  40.         //6.将延时任务实例添加到延时任务队列中
  41.         timeouts.add(timeout);
  42.         return timeout;
  43.     }
  44.    
  45.     //Starts the background thread explicitly.  
  46.     //The background thread will start automatically on demand even if you did not call this method.
  47.     //@throws IllegalStateException if this timer has been #stop() stopped already
  48.     public void start() {
  49.         switch (WORKER_STATE_UPDATER.get(this)) {
  50.         case WORKER_STATE_INIT:
  51.             if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
  52.                 //启动工作线程,即启动时间轮
  53.                 workerThread.start();
  54.             }
  55.             break;
  56.         case WORKER_STATE_STARTED:
  57.             break;
  58.         case WORKER_STATE_SHUTDOWN:
  59.             throw new IllegalStateException("cannot be started once stopped");
  60.         default:
  61.             throw new Error("Invalid WorkerState");
  62.         }
  63.         //Wait until the startTime is initialized by the worker.
  64.         while (startTime == 0) {
  65.             try {
  66.                 //阻塞时间轮的工作线程
  67.                 startTimeInitialized.await();
  68.             } catch (InterruptedException ignore) {
  69.                 //Ignore - it will be ready very soon.
  70.             }
  71.         }
  72.     }
  73.     ...
  74. }
复制代码
(2)实行延时任务
步调一:纪录时间轮启动的时间startTime
步调二:开始do while循环,唤醒被阻塞的start()方法,关照时间轮已经启动完毕
步调三:阻塞等待下一次指针转动的时间
步调四:盘算当前指针指向的时间轮的槽位idx
步调五:将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
步调六:获取当前指针指向的时间槽HashedWheelBucket
步调七:遍历延时任务队列timeouts,将此中的延时任务保存到对应的槽的链表中,根据延时时间盘算对应的时间槽和remainingRounds圈数
步调八:运行目前指针指向的时间槽中的链表的任务,通过taskExecutor线程池去实行到期的任务
步调九:到期的和取消的延时任务从链表中移除并将pendingTimeouts--
步调十:时间轮指针的总转动次数tick++,继续do while循环
步调十一:扫除时间轮中不需要处置惩罚的任务,保存到unprocessedTimeouts中
步调十二:将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中
步调十三:将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1

  1. public class HashedWheelTimer implements Timer {
  2.     private volatile long startTime;
  3.     private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
  4.    
  5.     ...
  6.     //指针转动和执行延时任务的线程
  7.     private final class Worker implements Runnable {
  8.         //用于记录未执行的延时任务
  9.         private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
  10.    
  11.         //总的tick数(指针嘀嗒的次数)
  12.         private long tick;
  13.    
  14.         @Override
  15.         public void run() {
  16.             //1.记录时间轮启动的时间startTime
  17.             startTime = System.nanoTime();
  18.             if (startTime == 0) {
  19.                 //我们在这里使用0作为未初始化值的指示符,所以要确保初始化时它不是0
  20.                 startTime = 1;
  21.             }
  22.    
  23.             //2.唤醒被阻塞的start()方法,通知时间轮已经启动完毕
  24.             startTimeInitialized.countDown();
  25.         
  26.             //一直执行do while循环,直到时间轮被关闭
  27.             do {
  28.                 //3.阻塞等待下一次指针转动的时间
  29.                 //这里会休眠tick的时间,模拟指针走动
  30.                 final long deadline = waitForNextTick();
  31.                 if (deadline > 0) {
  32.                     //4.计算当前指针指向的时间轮槽位idx
  33.                     int idx = (int) (tick & mask);
  34.                     //5.将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
  35.                     processCancelledTasks();
  36.                     //6.获取当前指针指向的时间槽HashedWheelBucket
  37.                     HashedWheelBucket bucket = wheel[idx];
  38.                     //7.遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中
  39.                     transferTimeoutsToBuckets();
  40.                     //8.运行目前指针指向的槽中的链表的任务,交给taskExecutor线程池去执行到期的延时任务
  41.                     //9.到期的和取消的延时任务从链表中移除并将pendingTimeouts--
  42.                     bucket.expireTimeouts(deadline);
  43.                     //10.时间轮指针的总转动次数tick++
  44.                     tick++;
  45.                 }
  46.             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
  47.    
  48.             //Fill the unprocessedTimeouts so we can return them from stop() method.
  49.             //11.清除时间轮中不需要处理的任务
  50.             for (HashedWheelBucket bucket: wheel) {
  51.                 bucket.clearTimeouts(unprocessedTimeouts);
  52.             }
  53.             //12.将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中
  54.             //遍历任务队列,如果发现有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中
  55.             for (;;) {
  56.                 HashedWheelTimeout timeout = timeouts.poll();
  57.                 if (timeout == null) {
  58.                     break;
  59.                 }
  60.                 if (!timeout.isCancelled()) {
  61.                     //如果延时任务没被取消,记录到未执行的任务Set集合中
  62.                     unprocessedTimeouts.add(timeout);
  63.                 }
  64.             }
  65.             //13.处理被取消的任务
  66.             processCancelledTasks();
  67.         }
  68.    
  69.         //将延时任务队列timeouts中等待添加到时间轮中的延时任务,转移到时间轮的指定位置
  70.         //也就是遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中
  71.         private void transferTimeoutsToBuckets() {
  72.             //每次转移10w个延时任务
  73.             for (int i = 0; i < 100000; i++) {
  74.                 //从队列中出队一个延时任务
  75.                 HashedWheelTimeout timeout = timeouts.poll();
  76.                 if (timeout == null) {
  77.                     //all processed
  78.                     break;
  79.                 }
  80.                 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
  81.                     //Was cancelled in the meantime.
  82.                     continue;
  83.                 }
  84.                 //到期一共需要走多少时间格(tick次数),deadline表示当前任务的延迟时间(从时间轮启动时计算),tickDuration表示时间格的时间间隔
  85.                 long calculated = timeout.deadline / tickDuration;
  86.                 //tick已经走了的时间格,到期一共还需要需要走多少圈
  87.                 timeout.remainingRounds = (calculated - tick) / wheel.length;
  88.                 //如果延时任务在队列中等待太久已经过了执行时间,那么这个时候就使用当前tick,也就是放在当前的bucket,此方法调用完后就会被执行
  89.                 final long ticks = Math.max(calculated, tick);
  90.                 //槽的索引,stopIndex = tick 次数 & mask, mask = wheel.length - 1
  91.                 int stopIndex = (int) (ticks & mask);
  92.                 //根据索引该任务应该放到的槽
  93.                 HashedWheelBucket bucket = wheel[stopIndex];
  94.                 //将任务添加到槽中,链表末尾
  95.                 bucket.addTimeout(timeout);
  96.             }
  97.         }
  98.    
  99.         //处理取消掉的延时任务
  100.         //将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1
  101.         private void processCancelledTasks() {
  102.             for (;;) {
  103.                 HashedWheelTimeout timeout = cancelledTimeouts.poll();
  104.                 if (timeout == null) {
  105.                     //all processed
  106.                     break;
  107.                 }
  108.                 try {
  109.                     timeout.remove();
  110.                 } catch (Throwable t) {
  111.                     if (logger.isWarnEnabled()) {
  112.                         logger.warn("An exception was thrown while process a cancellation task", t);
  113.                     }
  114.                 }
  115.             }
  116.         }
  117.    
  118.         //从时间轮的启动时间startTime和当前的tick数(指针跳动次数)计算下一次指针跳动的时间,然后休眠等待下一次指针跳动时间到来
  119.         private long waitForNextTick() {
  120.             //deadline返回的是下一次时间轮指针跳动的时间与时间格启动的时间间隔
  121.             long deadline = tickDuration * (tick + 1);
  122.    
  123.             for (;;) {
  124.                 //计算当前时间距离启动时间的时间间隔
  125.                 final long currentTime = System.nanoTime() - startTime;
  126.                 //距离下一次指针跳动还需休眠多长时间
  127.                 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
  128.                 //到了指针调到下一个槽位的时间
  129.                 if (sleepTimeMs <= 0) {
  130.                     if (currentTime == Long.MIN_VALUE) {
  131.                         return -Long.MAX_VALUE;
  132.                     } else {
  133.                         return currentTime;
  134.                     }
  135.                 }
  136.    
  137.                 try {
  138.                     //表示距离下一次指针跳动还需要一段时间,所以休眠等待时间的到来
  139.                     Thread.sleep(sleepTimeMs);
  140.                 } catch (InterruptedException ignored) {
  141.                     if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
  142.                         return Long.MIN_VALUE;
  143.                     }
  144.                 }
  145.             }
  146.         }
  147.    
  148.         //记录未执行的延时任务
  149.         public Set<Timeout> unprocessedTimeouts() {
  150.             return Collections.unmodifiableSet(unprocessedTimeouts);
  151.         }
  152.     }
  153.    
  154.     private static final class HashedWheelBucket {
  155.         ...
  156.         public void expireTimeouts(long deadline) {
  157.             HashedWheelTimeout timeout = head;
  158.         
  159.             //process all timeouts
  160.             while (timeout != null) {
  161.                 HashedWheelTimeout next = timeout.next;
  162.                 if (timeout.remainingRounds <= 0) {
  163.                     next = remove(timeout);
  164.                     if (timeout.deadline <= deadline) {
  165.                         //通过线程池执行任务
  166.                         timeout.expire();
  167.                     } else {
  168.                         //The timeout was placed into a wrong slot. This should never happen.
  169.                         throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
  170.                     }
  171.                 } else if (timeout.isCancelled()) {
  172.                     next = remove(timeout);
  173.                 } else {
  174.                     timeout.remainingRounds --;
  175.                 }
  176.                 timeout = next;
  177.             }
  178.         }
  179.         public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
  180.             HashedWheelTimeout next = timeout.next;
  181.             //remove timeout that was either processed or cancelled by updating the linked-list
  182.             if (timeout.prev != null) {
  183.                 timeout.prev.next = next;
  184.             }
  185.             if (timeout.next != null) {
  186.                 timeout.next.prev = timeout.prev;
  187.             }
  188.             if (timeout == head) {
  189.                 //if timeout is also the tail we need to adjust the entry too
  190.                 if (timeout == tail) {
  191.                     tail = null;
  192.                     head = null;
  193.                 } else {
  194.                     head = next;
  195.                 }
  196.             } else if (timeout == tail) {
  197.                 //if the timeout is the tail modify the tail to be the prev node.
  198.                 tail = timeout.prev;
  199.             }
  200.             //null out prev, next and bucket to allow for GC.
  201.             timeout.prev = null;
  202.             timeout.next = null;
  203.             timeout.bucket = null;
  204.             timeout.timer.pendingTimeouts.decrementAndGet();
  205.             return next;
  206.         }
  207.         ...   
  208.     }
  209.    
  210.     private static final class HashedWheelTimeout implements Timeout, Runnable {
  211.         private final TimerTask task;
  212.         private final HashedWheelTimer timer;
  213.         ...
  214.         public void expire() {
  215.             if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
  216.                 return;
  217.             }
  218.             try {
  219.                 timer.taskExecutor.execute(this);
  220.             } catch (Throwable t) {
  221.                 if (logger.isWarnEnabled()) {
  222.                     logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName() + " for execution.", t);
  223.                 }
  224.             }
  225.         }
  226.         ...
  227.     }
  228.     ...
  229. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曂沅仴駦

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表