浅析Kubernetes架构之workqueue

打印 上一主题 下一主题

主题 901|帖子 901|积分 2703

通用队列

在kubernetes中,使用go的channel无法满足kubernetes的应用场景,如延迟、限速等;在kubernetes中存在三种队列通用队列 common queue ,延迟队列 delaying queue,和限速队列 rate limiters queue
Inferface

Interface作为所有队列的一个抽象定义
  1. type Interface interface {
  2.         Add(item interface{})
  3.         Len() int
  4.         Get() (item interface{}, shutdown bool)
  5.         Done(item interface{})
  6.         ShutDown()
  7.         ShuttingDown() bool
  8. }
复制代码
Implementation
  1. type Type struct { // 一个work queue
  2.         queue []t // queue用slice做存储
  3.         dirty set // 脏位,定义了需要处理的元素,类似于操作系统,表示已修改但为写入
  4.         processing set // 当前正在处理的元素集合
  5.         cond *sync.Cond
  6.         shuttingDown bool
  7.         metrics queueMetrics
  8.         unfinishedWorkUpdatePeriod time.Duration
  9.         clock                      clock.Clock
  10. }
  11. type empty struct{}
  12. type t interface{} // t queue中的元素
  13. type set map[t]empty // dirty 和 processing中的元素
复制代码
可以看到其中核心属性就是 queue , dirty , processing
延迟队列

在研究优先级队列前,需要对  Heap 有一定的了解,因为delay queue使用了 heap 做延迟队列
Heap

Heap 是基于树属性的特殊数据结构;heap是一种完全二叉树类型,具有两种类型:

  • 如:B 是 A 的子节点,则  \(key(A) \geq key(B)\) 。这就意味着具有最大Key的元素始终位于根节点,这类Heap称为最大堆 MaxHeap
  • 父节点的值小于或等于其左右子节点的值叫做 MinHeap
二叉堆的存储规则:

  • 每个节点包含的元素大于或等于该节点子节点的元素。
  • 树是完全二叉树。
那么下列图片中,那个是堆

heap的实现
实例:向左边添加一个值为42的元素的过程

步骤一:将新元素放入堆中的第一个可用位置。这将使结构保持为完整的二叉树,但它可能不再是堆,因为新元素可能具有比其父元素更大的值。

步骤二:如果新元素的值大于父元素,将新元素与父元素交换,直到达到新元素到根,或者新元素大于等于其父元素的值时将停止

这种过程被称为 向上调整 (reheapification upward)
实例:移除根

步骤一:将根元素复制到用于返回值的变量中,将最深层的最后一个元素复制到根,然后将最后一个节点从树中取出。该元素称为 out-of-place 。

步骤二:而将异位元素与其最大值的子元素交换,并返回在步骤1中保存的值。

这个过程被称为向下调整 (reheapification downward)
优先级队列

优先级队列的行为:

  • 元素被放置在队列中,然后被取出。
  • 优先级队列中的每个元素都有一个关联的数字,称为优先级。
  • 当元素离开优先级队列时,最高优先级的元素最先离开。
如何实现的:

  • 在优先级队列中,heap的每个节点都包含一个元素以及元素的优先级,并且维护树以便它遵循使用元素的优先级来比较节点的堆存储规则:

    • 每个节点包含的元素的优先级大于或等于该节点子元素的优先级。
    • 树是完全二叉树。

  • 实现的代码:golang priorityQueue
Reference
heap
Client-go 的延迟队列

在Kubernetes中对 delaying queue 的设计非常精美,通过使用 heap 实现的延迟队列,加上kubernetes中的通过队列,完成了延迟队列的功能。
  1. // 注释中给了一个hot-loop热循环,通过这个loop实现了delaying
  2. type DelayingInterface interface {
  3.         Interface // 继承了workqueue的功能
  4.         AddAfter(item interface{}, duration time.Duration) // 在time后将内容添加到工作队列中
  5. }
复制代码
具体实现了 DelayingInterface 的实例
[code]type delayingType struct {        Interface // 通用的queue         clock clock.Clock // 对比的时间 ,包含一些定时器的功能            type Clock interface {            PassiveClock                            type PassiveClock interface {                        Now() time.Time                        Since(time.Time) time.Duration                    }            After(time.Duration)  0 {                        if nextReadyAtTimer != nil {                                nextReadyAtTimer.Stop()                        }                        entry := waitingForQueue.Peek().(*waitFor) // 窥视[0]和值                        nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) // 创建一个定时器                        nextReadyAt = nextReadyAtTimer.C()                }                select {                case

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

商道如狼道

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表