深入解析kubernetes controller-runtime

打印 上一主题 下一主题

主题 724|帖子 724|积分 2172

Overview

controller-runtime 是 Kubernetes 社区提供可供快速搭建一套  实现了controller 功能的工具,无需自行实现Controller的功能了;在 Kubebuilder  与  Operator SDK 也是使用 controller-runtime 。本文将对 controller-runtime 的工作原理以及在不同场景下的使用方式进行简要的总结和介绍。
controller-runtime structure

controller-runtime 主要组成是需要用户创建的 Manager 和 Reconciler 以及 Controller Runtime 自己启动的 Cache 和 Controller 。

  • Manager:是用户在初始化时创建的,用于启动 Controller Runtime 组件
  • Reconciler:是用户需要提供来处理自己的业务逻辑的组件(即在通过 code-generator 生成的api-like而实现的controller中的业务处理部分)。
  • Cache:一个缓存,用来建立 Informer 到 ApiServer 的连接来监听资源并将被监听的对象推送到queue中。
  • Controller: 一方面向 Informer 注册 eventHandler,另一方面从队列中获取数据。controller 将从队列中获取数据并执行用户自定义的 Reconciler 功能。

图:controller-runtime structure
图:controller-runtime flowchart由图可知,Controller会向 Informer 注册一些列eventHandler;然后Cache启动Informer(informer属于cache包中),与ApiServer建立监听;当Informer检测到资源变化时,将对象加入queue,Controller 将元素取出并在用户端执行 Reconciler。
Controller引入

我们从 controller-rumtime项目的 example 进行引入看下,整个架构都是如何实现的。
可以看到 example 下的实际上实现了一个 reconciler  的结构体,实现了 Reconciler 抽象和 Client 结构体
  1. type reconciler struct {
  2.         client.Client
  3.         scheme *runtime.Scheme
  4. }
复制代码
那么来看下 抽象的 Reconciler 是什么,可以看到就是抽象了 Reconcile 方法,这个是具体处理的逻辑过程
  1. type Reconciler interface {
  2.         Reconcile(context.Context, Request) (Result, error)
  3. }
复制代码
下面在看下谁来实现了这个 Reconciler 抽象
  1. type Controller interface {
  2.         reconcile.Reconciler // 协调的具体步骤,通过ns/name\
  3.     // 通过predicates来评估来源数据,并加入queue中(放入队列的是reconcile.Requests)
  4.         Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
  5.     // 启动controller,类似于自定义的Run()
  6.         Start(ctx context.Context) error
  7.         GetLogger() logr.Logger
  8. }
复制代码
controller structure

controller-runtime\pkg\internal\controller\controller.go 中实现了这个 Controller
  1. type Controller struct {
  2.         Name string // controller的标识
  3.    
  4.         MaxConcurrentReconciles int // 并发运行Reconciler的数量,默认1
  5.         // 实现了reconcile.Reconciler的调节器, 默认DefaultReconcileFunc
  6.         Do reconcile.Reconciler
  7.         // makeQueue会构建一个对应的队列,就是返回一个限速队列
  8.         MakeQueue func() workqueue.RateLimitingInterface
  9.         // MakeQueue创造出来的,在出入队列就是操作的这个
  10.         Queue workqueue.RateLimitingInterface
  11.         // 用于注入其他内容
  12.     // 已弃用
  13.         SetFields func(i interface{}) error
  14.         mu sync.Mutex
  15.         // 标识开始的状态
  16.         Started bool
  17.         // 在启动时传递的上下文,用于停止控制器
  18.         ctx context.Context
  19.         // 等待缓存同步的时间 默认2分钟
  20.         CacheSyncTimeout time.Duration
  21.         // 维护了eventHandler predicates,在控制器启动时启动
  22.         startWatches []watchDescription
  23.         // 日志构建器,输出入日志
  24.         LogConstructor func(request *reconcile.Request) logr.Logger
  25.         // RecoverPanic为是否对reconcile引起的panic恢复
  26.         RecoverPanic bool
  27. }
复制代码
看完了controller的structure,接下来看看controller是如何使用的
injection

Controller.Watch 实现了注入的动作,可以看到 watch() 通过参数将 对应的事件函数传入到内部
  1. func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
  2.         c.mu.Lock()
  3.         defer c.mu.Unlock()
  4.         // 使用SetFields来完成注入操作
  5.         if err := c.SetFields(src); err != nil {
  6.                 return err
  7.         }
  8.         if err := c.SetFields(evthdler); err != nil {
  9.                 return err
  10.         }
  11.         for _, pr := range prct {
  12.                 if err := c.SetFields(pr); err != nil {
  13.                         return err
  14.                 }
  15.         }
  16.         // 如果Controller还未启动,那么将这些动作缓存到本地
  17.         if !c.Started {
  18.                 c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
  19.                 return nil
  20.         }
  21.         c.LogConstructor(nil).Info("Starting EventSource", "source", src)
  22.         return src.Start(c.ctx, evthdler, c.Queue, prct...)
  23. }
复制代码
启动操作实际上为informer注入事件函数
  1. type Source interface {
  2.         // start 是Controller 调用,用以向 Informer 注册 EventHandler, 将 reconcile.Requests(一个入队列的动作) 排入队列。
  3.         Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
  4. }
  5. func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
  6.         prct ...predicate.Predicate) error {
  7.         // Informer should have been specified by the user.
  8.         if is.Informer == nil {
  9.                 return fmt.Errorf("must specify Informer.Informer")
  10.         }
  11.         is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
  12.         return nil
  13. }
复制代码
我们知道对于 eventHandler,实际上应该是一个 onAdd,onUpdate 这种类型的函数,queue则是workqueue,那么 Predicates 是什么呢?
通过追踪可以看到定义了 Predicate 抽象,可以看出Predicate 是Watch到的事件时什么类型的,当对于每个类型的事件,对应的函数就为 true,在 eventHandler 中,这些被用作,事件的过滤。
  1. // Predicate filters events before enqueuing the keys.
  2. type Predicate interface {
  3.         // Create returns true if the Create event should be processed
  4.         Create(event.CreateEvent) bool
  5.         // Delete returns true if the Delete event should be processed
  6.         Delete(event.DeleteEvent) bool
  7.         // Update returns true if the Update event should be processed
  8.         Update(event.UpdateEvent) bool
  9.         // Generic returns true if the Generic event should be processed
  10.         Generic(event.GenericEvent) bool
  11. }
复制代码
在对应的动作中,可以看到这里作为过滤操作
  1. func (e EventHandler) OnAdd(obj interface{}) {
  2.         c := event.CreateEvent{}
  3.         // Pull Object out of the object
  4.         if o, ok := obj.(client.Object); ok {
  5.                 c.Object = o
  6.         } else {
  7.                 log.Error(nil, "OnAdd missing Object",
  8.                         "object", obj, "type", fmt.Sprintf("%T", obj))
  9.                 return
  10.         }
  11.         for _, p := range e.Predicates {
  12.                 if !p.Create(c) {
  13.                         return
  14.                 }
  15.         }
  16.         // Invoke create handler
  17.         e.EventHandler.Create(c, e.Queue)
  18. }
复制代码
上面就看到了,对应是 EventHandler.Create 进行添加的,那么这些动作具体是在做什么呢?
在代码 pkg/handler ,可以看到这些操作,类似于create,这里将ns/name放入到队列中。
  1. func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
  2.         if evt.Object == nil {
  3.                 enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
  4.                 return
  5.         }
  6.         q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
  7.                 Name:      evt.Object.GetName(),
  8.                 Namespace: evt.Object.GetNamespace(),
  9.         }})
  10. }
复制代码
unqueue

上面看到了,入队的动作实际上都是将 ns/name 加入到队列中,那么出队列时又做了些什么呢?
通过 controller.Start()  可以看到controller在启动后都做了些什么动作
  1. func (c *Controller) Start(ctx context.Context) error {
  2.         c.mu.Lock()
  3.         if c.Started {
  4.                 return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
  5.         }
  6.         c.initMetrics()
  7.         // Set the internal context.
  8.         c.ctx = ctx
  9.         c.Queue = c.MakeQueue() // 初始化queue
  10.         go func() { // 退出时,让queue关闭
  11.                 <-ctx.Done()
  12.                 c.Queue.ShutDown()
  13.         }()
  14.         wg := &sync.WaitGroup{}
  15.         err := func() error {
  16.                 defer c.mu.Unlock()
  17.                 defer utilruntime.HandleCrash()
  18.                 // 启动informer前,将之前准备好的 evnetHandle predictates source注册
  19.                 for _, watch := range c.startWatches {
  20.                         c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
  21.                                 // 上面我们看过了,start就是真正的注册动作
  22.                         if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
  23.                                 return err
  24.                         }
  25.                 }
  26.                 // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
  27.                 c.LogConstructor(nil).Info("Starting Controller")
  28.                  // startWatches上面我们也看到了,是evnetHandle predictates source被缓存到里面,
  29.         // 这里是拿出来将其启动
  30.                 for _, watch := range c.startWatches {
  31.                         syncingSource, ok := watch.src.(source.SyncingSource)
  32.                         if !ok {
  33.                                 continue
  34.                         }
  35.                         if err := func() error {
  36.                                 // use a context with timeout for launching sources and syncing caches.
  37.                                 sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
  38.                                 defer cancel()
  39.                                 // WaitForSync waits for a definitive timeout, and returns if there
  40.                                 // is an error or a timeout
  41.                                 if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
  42.                                         err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
  43.                                         c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
  44.                                         return err
  45.                                 }
  46.                                 return nil
  47.                         }(); err != nil {
  48.                                 return err
  49.                         }
  50.                 }
  51.                 // which won't be garbage collected if we hold a reference to it.
  52.                 c.startWatches = nil
  53.                 // Launch workers to process resources
  54.                 c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
  55.                 wg.Add(c.MaxConcurrentReconciles)
  56.         // 启动controller消费端的线程
  57.                 for i := 0; i < c.MaxConcurrentReconciles; i++ {
  58.                         go func() {
  59.                                 defer wg.Done()
  60.                                 for c.processNextWorkItem(ctx) {
  61.                                 }
  62.                         }()
  63.                 }
  64.                 c.Started = true
  65.                 return nil
  66.         }()
  67.         if err != nil {
  68.                 return err
  69.         }
  70.         <-ctx.Done() // 阻塞,直到上下文关闭
  71.         c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
  72.         wg.Wait() // 等待所有线程都关闭
  73.         c.LogConstructor(nil).Info("All workers finished")
  74.         return nil
  75. }
复制代码
manager.New()
start Manager

接下来是manager的启动,也就是对应的 start() 与 doWatch()
通过下述代码我们可以看出来,对于 doWatch() 就是把 compete() 前的一些资源的事件函数都注入到controller 中
  1. func (c *Controller) processNextWorkItem(ctx context.Context) bool {
  2.         obj, shutdown := c.Queue.Get() // 从队列中拿取数据
  3.         if shutdown {
  4.                 return false
  5.         }
  6.         defer c.Queue.Done(obj)
  7.         // 下面应该是prometheus指标的一些东西
  8.         ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
  9.         defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
  10.         // 获得的对象通过reconcileHandler处理
  11.         c.reconcileHandler(ctx, obj)
  12.         return true
  13. }
复制代码
由于前两部 builder 的操作将 mgr 指针传入到 builder中,并且操作了 complete() ,也就是操作了 build() ,这代表了对 controller 完成了初始化,和事件注入(watch)的操作,所以 Start(),就是将controller启动
[code]func (cm *controllerManager) Start(ctx context.Context) (err error) {        cm.Lock()        if cm.started {                cm.Unlock()                return errors.New("manager already started")        }        var ready bool        defer func() {                if !ready {                        cm.Unlock()                }        }()        // Initialize the internal context.        cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)        // 这个channel代表了controller的停止        stopComplete := make(chan struct{})        defer close(stopComplete)        // This must be deferred after closing stopComplete, otherwise we deadlock.        defer func() {                stopErr := cm.engageStopProcedure(stopComplete)                if stopErr != nil {                        if err != nil {                                err = kerrors.NewAggregate([]error{err, stopErr})                        } else {                                err = stopErr                        }                }        }()        // Add the cluster runnable.        if err := cm.add(cm.cluster); err != nil {                return fmt.Errorf("failed to add cluster to runnables: %w", err)        }    // 指标类        if cm.metricsListener != nil {                cm.serveMetrics()        }        if cm.healthProbeListener != nil {                cm.serveHealthProbes()        }        if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {                if !errors.Is(err, wait.ErrWaitTimeout) {                        return err                }        }        // 等待informer同步完成        if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {                if !errors.Is(err, wait.ErrWaitTimeout) {                        return err                }        }        // 非选举模式,runnable将在cache同步完成后启动        if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {                if !errors.Is(err, wait.ErrWaitTimeout) {                        return err                }        }        // Start the leader election and all required runnables.        {                ctx, cancel := context.WithCancel(context.Background())                cm.leaderElectionCancel = cancel                go func() {                        if cm.resourceLock != nil {                                if err := cm.startLeaderElection(ctx); err != nil {                                        cm.errChan

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

魏晓东

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表