ToB企服应用市场:ToB评测及商务社交产业平台

标题: 浅析Kubernetes架构之workqueue [打印本页]

作者: 商道如狼道    时间: 2022-6-21 11:08
标题: 浅析Kubernetes架构之workqueue
通用队列

在kubernetes中,使用go的channel无法满足kubernetes的应用场景,如延迟、限速等;在kubernetes中存在三种队列通用队列 common queue ,延迟队列 delaying queue,和限速队列 rate limiters queue
Inferface

Interface作为所有队列的一个抽象定义
  1. type Interface interface {
  2.         Add(item interface{})
  3.         Len() int
  4.         Get() (item interface{}, shutdown bool)
  5.         Done(item interface{})
  6.         ShutDown()
  7.         ShuttingDown() bool
  8. }
复制代码
Implementation
  1. type Type struct { // 一个work queue
  2.         queue []t // queue用slice做存储
  3.         dirty set // 脏位,定义了需要处理的元素,类似于操作系统,表示已修改但为写入
  4.         processing set // 当前正在处理的元素集合
  5.         cond *sync.Cond
  6.         shuttingDown bool
  7.         metrics queueMetrics
  8.         unfinishedWorkUpdatePeriod time.Duration
  9.         clock                      clock.Clock
  10. }
  11. type empty struct{}
  12. type t interface{} // t queue中的元素
  13. type set map[t]empty // dirty 和 processing中的元素
复制代码
可以看到其中核心属性就是 queue , dirty , processing
延迟队列

在研究优先级队列前,需要对  Heap 有一定的了解,因为delay queue使用了 heap 做延迟队列
Heap

Heap 是基于树属性的特殊数据结构;heap是一种完全二叉树类型,具有两种类型:
二叉堆的存储规则:
那么下列图片中,那个是堆

heap的实现
实例:向左边添加一个值为42的元素的过程

步骤一:将新元素放入堆中的第一个可用位置。这将使结构保持为完整的二叉树,但它可能不再是堆,因为新元素可能具有比其父元素更大的值。

步骤二:如果新元素的值大于父元素,将新元素与父元素交换,直到达到新元素到根,或者新元素大于等于其父元素的值时将停止

这种过程被称为 向上调整 (reheapification upward)
实例:移除根

步骤一:将根元素复制到用于返回值的变量中,将最深层的最后一个元素复制到根,然后将最后一个节点从树中取出。该元素称为 out-of-place 。

步骤二:而将异位元素与其最大值的子元素交换,并返回在步骤1中保存的值。

这个过程被称为向下调整 (reheapification downward)
优先级队列

优先级队列的行为:
如何实现的:
Reference
heap
Client-go 的延迟队列

在Kubernetes中对 delaying queue 的设计非常精美,通过使用 heap 实现的延迟队列,加上kubernetes中的通过队列,完成了延迟队列的功能。
  1. // 注释中给了一个hot-loop热循环,通过这个loop实现了delaying
  2. type DelayingInterface interface {
  3.         Interface // 继承了workqueue的功能
  4.         AddAfter(item interface{}, duration time.Duration) // 在time后将内容添加到工作队列中
  5. }
复制代码
具体实现了 DelayingInterface 的实例
[code]type delayingType struct {        Interface // 通用的queue         clock clock.Clock // 对比的时间 ,包含一些定时器的功能            type Clock interface {            PassiveClock                            type PassiveClock interface {                        Now() time.Time                        Since(time.Time) time.Duration                    }            After(time.Duration)  0 {                        if nextReadyAtTimer != nil {                                nextReadyAtTimer.Stop()                        }                        entry := waitingForQueue.Peek().(*waitFor) // 窥视[0]和值                        nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) // 创建一个定时器                        nextReadyAt = nextReadyAtTimer.C()                }                select {                case




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4