ToB企服应用市场:ToB评测及商务社交产业平台
标题:
浅析Kubernetes架构之workqueue
[打印本页]
作者:
商道如狼道
时间:
2022-6-21 11:08
标题:
浅析Kubernetes架构之workqueue
通用队列
在kubernetes中,使用go的channel无法满足kubernetes的应用场景,如延迟、限速等;在kubernetes中存在三种队列通用队列 common queue ,延迟队列 delaying queue,和限速队列 rate limiters queue
Inferface
Interface作为所有队列的一个抽象定义
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
复制代码
Implementation
type Type struct { // 一个work queue
queue []t // queue用slice做存储
dirty set // 脏位,定义了需要处理的元素,类似于操作系统,表示已修改但为写入
processing set // 当前正在处理的元素集合
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
type empty struct{}
type t interface{} // t queue中的元素
type set map[t]empty // dirty 和 processing中的元素
复制代码
可以看到其中核心属性就是 queue , dirty , processing
延迟队列
在研究优先级队列前,需要对 Heap 有一定的了解,因为delay queue使用了 heap 做延迟队列
Heap
Heap 是基于树属性的特殊数据结构;heap是一种完全二叉树类型,具有两种类型:
如:B 是 A 的子节点,则 \(key(A) \geq key(B)\) 。这就意味着具有最大Key的元素始终位于根节点,这类Heap称为最大堆
MaxHeap
。
父节点的值小于或等于其左右子节点的值叫做
MinHeap
二叉堆的存储规则:
每个节点包含的元素大于或等于该节点子节点的元素。
树是完全二叉树。
那么下列图片中,那个是堆
heap的实现
实例:向左边添加一个值为42的元素的过程
步骤一
:将新元素放入堆中的第一个可用位置。这将使结构保持为完整的二叉树,但它可能不再是堆,因为新元素可能具有比其父元素更大的值。
步骤二
:如果新元素的值大于父元素,将新元素与父元素交换,直到达到新元素到根,或者新元素大于等于其父元素的值时将停止
这种过程被称为
向上调整
(reheapification upward)
实例:移除根
步骤一
:将根元素复制到用于返回值的变量中,将最深层的最后一个元素复制到根,然后将最后一个节点从树中取出。该元素称为 out-of-place 。
步骤二
:而将异位元素与其最大值的子元素交换,并返回在步骤1中保存的值。
这个过程被称为
向下调整
(reheapification downward)
优先级队列
优先级队列的行为:
元素被放置在队列中,然后被取出。
优先级队列中的每个元素都有一个关联的数字,称为优先级。
当元素离开优先级队列时,最高优先级的元素最先离开。
如何实现的:
在优先级队列中,heap的每个节点都包含一个元素以及元素的优先级,并且维护树以便它遵循使用元素的优先级来比较节点的堆存储规则:
每个节点包含的元素的优先级大于或等于该节点子元素的优先级。
树是完全二叉树。
实现的代码:
golang priorityQueue
Reference
heap
Client-go 的延迟队列
在Kubernetes中对 delaying queue 的设计非常精美,通过使用 heap 实现的延迟队列,加上kubernetes中的通过队列,完成了延迟队列的功能。
// 注释中给了一个hot-loop热循环,通过这个loop实现了delaying
type DelayingInterface interface {
Interface // 继承了workqueue的功能
AddAfter(item interface{}, duration time.Duration) // 在time后将内容添加到工作队列中
}
复制代码
具体实现了 DelayingInterface 的实例
[code]type delayingType struct { Interface // 通用的queue clock clock.Clock // 对比的时间 ,包含一些定时器的功能 type Clock interface { PassiveClock type PassiveClock interface { Now() time.Time Since(time.Time) time.Duration } After(time.Duration) 0 { if nextReadyAtTimer != nil { nextReadyAtTimer.Stop() } entry := waitingForQueue.Peek().(*waitFor) // 窥视[0]和值 nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) // 创建一个定时器 nextReadyAt = nextReadyAtTimer.C() } select { case
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4