client-go实战之九:手写一个kubernetes的controller

张春  金牌会员 | 2023-11-14 06:15:05 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 917|帖子 917|积分 2751

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
本篇概览


  • 本文是《client-go实战》系列的第九篇,前面咱们已经了解了client-go的基本功能,现在要来一次经典的综合实战了,接下来咱们会手写一个kubernetes的controller,其功能是:监听某种资源的变化,一旦资源发生变化(例如增加或者删除),apiserver就会有广播发出,controller使用client-go可以订阅这个广播,然后在收到广播后进行各种业务操作,
  • 本次实战代码量略大,但如果随本文一步步先设计再开发,并不会觉得有太多,总的来说由以下内容构成

  • 代码整体架构一览
  • 对着架构细说流程
  • 全局重点的小结
  • 编码实战
代码整体架构一览


  • 首先,再次明确本次实战的目标:开发出类似kubernetes的controller那样的功能,实时监听pod资源的变化,针对每个变化做出响应
  • 今天的实战源自client-go的官方demo,其主要架构如下

  • 可能您会觉得上图有些复杂,没关系,接下来咱们细说此图,为后面的编码打好理论基础
对着架构细说流程


  • 首先将上述架构图中涉及的内容进行分类,共有三部分

  • 最左侧的Kubernetes API Server+etcd是第一部分,它们都是kubernetes的内部组件
  • 第二部分是整个informer,informer是client-go库的核心模块
  • 第三部分是WorkQueue和Conrol Loop,它们都是controller的业务逻辑代码


  • 上面三部分合作,就能做到监听资源变化并做出响应
  • 另外,informer内部很复杂也很精巧,后面会有专门的文章去细说,本篇只会提到与controller有关系的informer细节,其余的能不提就不提(不然内容太多,这篇文章写不完了)
  • 分类完毕后,再来聊流程

  • controller会通过client-go的list&watch机制与API Server建立长连接(http2的stream),只要pod资源发生变化,API Server就会通过长连接推送到controller
  • API Server推的数据到达Reflector,它将数据写入Delta FIFO Queue
  • Delta FIFO Queue是个先入先出的队列,除了pod信息还保存了操作类型(增加、修改、删除),informer内部不断从这个队列获取数据,再执行AddFunc、UpdateFunc、DeleteFunc等方法
  • 完整的pod数据被存放在Local Store中,外部通过Indexer随时可以获取到
  • controller中准备一个或多个工作队列,在执行AddFunc、UpdateFunc、DeleteFunc等方法时,可以将定制化的数据放入工作队列中
  • controller中启动一个或多个协程,持续从工作队列中取数据,执行业务逻辑,执行过程中如果需要pod的详细数据,可以通过indexder获取


  • 差不多了,我有种胸有成竹的感觉,迫不及待想写代码,但还是忍忍吧,先规划再动手
编码规划


  • 所谓规划就是把步骤捋清楚,先写啥再写啥,如下图所示

  • 捋顺了,开始写代码吧
编码之一:定义Controller数据结构(controller.go)

  1. type Controller struct {
  2.         indexer  cache.Indexer
  3.         queue    workqueue.RateLimitingInterface
  4.         informer cache.Controller
  5. }
复制代码

  • 从上述代码可见Controller结构体有三个成员,indexer是informer内负责存取完整资源信息的对象,queue是用于业务逻辑的工作队列
编码之二:编写业务逻辑代码(controller.go)


  • 业务逻辑代码共有四部分

  • 把资源变化信息存入工作队列,这里可能按实际需求定制(例如有的数据不关注就丢弃了)
  • 从工作队列中取出数据
  • 取出数据后的处理逻辑,这边是纯粹的业务需求了,各人的实现都不一样
  • 异常处理


  • 步骤1,存入工作队列的操作,留待初始化informer的时候再做,
  • 步骤4,异常处理稍后也有单独段落细说
  • 这里只聚焦步骤2和3:怎么取,取出后怎么用
  • 先写步骤2的代码:从工作队列中取取数据,用名为processNextItem的方法来实现(对每一行代码进行中文注释着实不易,支持的话请点个赞)
  1. func (c *Controller) processNextItem() bool {
  2.         // 阻塞等待,直到队列中有数据可以被取出,
  3.         // 另外有可能是多协程并发获取数据,此key会被放入processing中,表示正在被处理
  4.         key, quit := c.queue.Get()
  5.         // 如果最外层调用了队列的Shutdown,这里的quit就会返回true,
  6.         // 调用processNextItem的地方发现processNextItem返回false,就不会再次调用processNextItem了
  7.         if quit {
  8.                 return false
  9.         }
  10.         // 表示该key已经被处理完成(从processing中移除)
  11.         defer c.queue.Done(key)
  12.         // 调用业务方法,实现具体的业务需求
  13.         err := c.syncToStdout(key.(string))
  14.         // Handle the error if something went wrong during the execution of the business logic
  15.         // 判断业务逻辑处理是否出现异常,如果出现就重新放入队列,以此实现重试,如果已经重试过5次,就放弃
  16.         c.handleErr(err, key)
  17.         // 调用processNextItem的地方发现processNextItem返回true,就会再次调用processNextItem
  18.         return true
  19. }
复制代码

  • 接下来写业务处理的代码,就是上面调用的syncToStdout方法,常规套路是检查spec和status的差距,然后让status和spec保持一致,(例如spec中指定副本数为2,而status中记录了真实的副本数是1,所以业务处理就是增加一个副本数),这里仅仅是为了展示业务处理代码在哪些,所以就简(fu)化(yan)一些了,只打印pod的名称
  1. func (c *Controller) syncToStdout(key string) error {
  2.         // 根据key从本地存储中获取完整的pod信息
  3.         // 由于有长连接与apiserver保持同步,因此本地的pod信息与kubernetes集群内保持一致
  4.         obj, exists, err := c.indexer.GetByKey(key)
  5.         if err != nil {
  6.                 klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
  7.                 return err
  8.         }
  9.         if !exists {
  10.                 fmt.Printf("Pod %s does not exist anymore\n", key)
  11.         } else {
  12.                 // 这里就是真正的业务逻辑代码了,一般会比较spce和status的差异,然后做出处理使得status与spce保持一致,
  13.                 // 此处为了代码简单仅仅打印一行日志
  14.                 fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
  15.         }
  16.         return nil
  17. }
复制代码
编码之三:编写错误处理代码(controller.go)


  • 回顾前面的processNextItem方法内容,在调用syncToStdout执行完业务逻辑后就立即调用handleErr方法了,此方法的作用是检查syncToStdout的返回值是否有错误,然后做针对性处理
  1. func (c *Controller) handleErr(err error, key interface{}) {
  2.         // 没有错误时的处理逻辑
  3.         if err == nil {
  4.                 // 确认这个key已经被成功处理,在队列中彻底清理掉
  5.                 // 假设之前在处理该key的时候曾报错导致重新进入队列等待重试,那么也会因为这个Forget方法而不再被重试
  6.                 c.queue.Forget(key)
  7.                 return
  8.         }
  9.         // 代码走到这里表示前面执行业务逻辑的时候发生了错误,
  10.         // 检查已经重试的次数,如果不操作5次就继续重试,这里可以根据实际需求定制
  11.         if c.queue.NumRequeues(key) < 5 {
  12.                 klog.Infof("Error syncing pod %v: %v", key, err)
  13.                 c.queue.AddRateLimited(key)
  14.                 return
  15.         }
  16.         // 如果重试超过了5次就彻底放弃了,也像执行成功那样调用Forget做彻底清理(否则就没完没了了)
  17.         c.queue.Forget(key)
  18.         // 向外部报告错误,走通用的错误处理流程
  19.         runtime.HandleError(err)
  20.         klog.Infof("Dropping pod %q out of the queue: %v", key, err)
  21. }
复制代码

  • 好了,和业务有关的代码已经完成,接下来就是搭建controller框架,把基本功能串起来
编码之四:编写Controller主流程(controller.go)


  • 编写一个完整的Controller,最基本的是构造方法,Controller的构造方法也很简单,保存三个重要的成员变量即可
  1. func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
  2.         return &Controller{
  3.                 informer: informer,
  4.                 indexer:  indexer,
  5.                 queue:    queue,
  6.         }
  7. }
复制代码

  • 先定义个名为runWorker的简单方法,里面是个无限循环,只要消费消息的processNextItem方法返回true,就无限循环下去
  1. func (c *Controller) runWorker() {
  2.         for c.processNextItem() {
  3.         }
  4. }
复制代码

  • 然后是Controller主流程代码,简介清晰,启动informer,开始接受apiserver推送,写入工作队列,然后开启无限循环从工作队列取数据并处理
[code]func (c *Controller) Run(workers int, stopCh chan struct{}) {        defer runtime.HandleCrash()        // 只要工作队列的ShutDown方法被调用,processNextItem方法就会返回false,runWorker的无限循环就会结束        defer c.queue.ShutDown()        klog.Info("Starting Pod controller")        // informer的Run方法执行后,就开始接受apiserver推送的资源变更事件,并更新本地存储        go c.informer.Run(stopCh)        // 等待本地存储和apiserver完成同步        if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {                runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))                return        }        // 启动worker,并发从工作队列取数据,然后执行业务逻辑        for i := 0; i < workers; i++ {                go wait.Until(c.runWorker, time.Second, stopCh)        }

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张春

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表