魏晓东 发表于 2022-8-20 05:28:17

深入解析kubernetes controller-runtime

Overview

controller-runtime 是 Kubernetes 社区提供可供快速搭建一套实现了controller 功能的工具,无需自行实现Controller的功能了;在 Kubebuilder与Operator SDK 也是使用 controller-runtime 。本文将对 controller-runtime 的工作原理以及在不同场景下的使用方式进行简要的总结和介绍。
controller-runtime structure

controller-runtime 主要组成是需要用户创建的 Manager 和 Reconciler 以及 Controller Runtime 自己启动的 Cache 和 Controller 。

[*]Manager:是用户在初始化时创建的,用于启动 Controller Runtime 组件
[*]Reconciler:是用户需要提供来处理自己的业务逻辑的组件(即在通过 code-generator 生成的api-like而实现的controller中的业务处理部分)。
[*]Cache:一个缓存,用来建立 Informer 到 ApiServer 的连接来监听资源并将被监听的对象推送到queue中。
[*]Controller: 一方面向 Informer 注册 eventHandler,另一方面从队列中获取数据。controller 将从队列中获取数据并执行用户自定义的 Reconciler 功能。
https://img2022.cnblogs.com/blog/1380340/202206/1380340-20220627221136307-385951010.png
图:controller-runtime structurehttps://img2022.cnblogs.com/blog/1380340/202206/1380340-20220627221151306-444229986.png
图:controller-runtime flowchart由图可知,Controller会向 Informer 注册一些列eventHandler;然后Cache启动Informer(informer属于cache包中),与ApiServer建立监听;当Informer检测到资源变化时,将对象加入queue,Controller 将元素取出并在用户端执行 Reconciler。
Controller引入

我们从 controller-rumtime项目的 example 进行引入看下,整个架构都是如何实现的。
可以看到 example 下的实际上实现了一个 reconciler的结构体,实现了 Reconciler 抽象和 Client 结构体
type reconciler struct {
        client.Client
        scheme *runtime.Scheme
}那么来看下 抽象的 Reconciler 是什么,可以看到就是抽象了 Reconcile 方法,这个是具体处理的逻辑过程
type Reconciler interface {
        Reconcile(context.Context, Request) (Result, error)
}下面在看下谁来实现了这个 Reconciler 抽象
type Controller interface {
        reconcile.Reconciler // 协调的具体步骤,通过ns/name\
    // 通过predicates来评估来源数据,并加入queue中(放入队列的是reconcile.Requests)
        Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
    // 启动controller,类似于自定义的Run()
        Start(ctx context.Context) error
        GetLogger() logr.Logger
}controller structure

在 controller-runtime\pkg\internal\controller\controller.go 中实现了这个 Controller
type Controller struct {
        Name string // controller的标识
   
        MaxConcurrentReconciles int // 并发运行Reconciler的数量,默认1
        // 实现了reconcile.Reconciler的调节器, 默认DefaultReconcileFunc
        Do reconcile.Reconciler
        // makeQueue会构建一个对应的队列,就是返回一个限速队列
        MakeQueue func() workqueue.RateLimitingInterface
        // MakeQueue创造出来的,在出入队列就是操作的这个
        Queue workqueue.RateLimitingInterface

        // 用于注入其他内容
    // 已弃用
        SetFields func(i interface{}) error

        mu sync.Mutex
        // 标识开始的状态
        Started bool
        // 在启动时传递的上下文,用于停止控制器
        ctx context.Context
        // 等待缓存同步的时间 默认2分钟
        CacheSyncTimeout time.Duration

        // 维护了eventHandler predicates,在控制器启动时启动
        startWatches []watchDescription

        // 日志构建器,输出入日志
        LogConstructor func(request *reconcile.Request) logr.Logger

        // RecoverPanic为是否对reconcile引起的panic恢复
        RecoverPanic bool
}看完了controller的structure,接下来看看controller是如何使用的
injection

Controller.Watch 实现了注入的动作,可以看到 watch() 通过参数将 对应的事件函数传入到内部
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
        c.mu.Lock()
        defer c.mu.Unlock()

        // 使用SetFields来完成注入操作
        if err := c.SetFields(src); err != nil {
                return err
        }
        if err := c.SetFields(evthdler); err != nil {
                return err
        }
        for _, pr := range prct {
                if err := c.SetFields(pr); err != nil {
                        return err
                }
        }

        // 如果Controller还未启动,那么将这些动作缓存到本地
        if !c.Started {
                c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
                return nil
        }

        c.LogConstructor(nil).Info("Starting EventSource", "source", src)
        return src.Start(c.ctx, evthdler, c.Queue, prct...)
}启动操作实际上为informer注入事件函数
type Source interface {
        // start 是Controller 调用,用以向 Informer 注册 EventHandler, 将 reconcile.Requests(一个入队列的动作) 排入队列。
        Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
        prct ...predicate.Predicate) error {
        // Informer should have been specified by the user.
        if is.Informer == nil {
                return fmt.Errorf("must specify Informer.Informer")
        }

        is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
        return nil
}我们知道对于 eventHandler,实际上应该是一个 onAdd,onUpdate 这种类型的函数,queue则是workqueue,那么 Predicates 是什么呢?
通过追踪可以看到定义了 Predicate 抽象,可以看出Predicate 是Watch到的事件时什么类型的,当对于每个类型的事件,对应的函数就为 true,在 eventHandler 中,这些被用作,事件的过滤。
// Predicate filters events before enqueuing the keys.
type Predicate interface {
        // Create returns true if the Create event should be processed
        Create(event.CreateEvent) bool

        // Delete returns true if the Delete event should be processed
        Delete(event.DeleteEvent) bool

        // Update returns true if the Update event should be processed
        Update(event.UpdateEvent) bool

        // Generic returns true if the Generic event should be processed
        Generic(event.GenericEvent) bool
}在对应的动作中,可以看到这里作为过滤操作
func (e EventHandler) OnAdd(obj interface{}) {
        c := event.CreateEvent{}

        // Pull Object out of the object
        if o, ok := obj.(client.Object); ok {
                c.Object = o
        } else {
                log.Error(nil, "OnAdd missing Object",
                        "object", obj, "type", fmt.Sprintf("%T", obj))
                return
        }

        for _, p := range e.Predicates {
                if !p.Create(c) {
                        return
                }
        }

        // Invoke create handler
        e.EventHandler.Create(c, e.Queue)
}上面就看到了,对应是 EventHandler.Create 进行添加的,那么这些动作具体是在做什么呢?
在代码 pkg/handler ,可以看到这些操作,类似于create,这里将ns/name放入到队列中。
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
        if evt.Object == nil {
                enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
                return
        }
        q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
                Name:      evt.Object.GetName(),
                Namespace: evt.Object.GetNamespace(),
        }})
}unqueue

上面看到了,入队的动作实际上都是将 ns/name 加入到队列中,那么出队列时又做了些什么呢?
通过 controller.Start()可以看到controller在启动后都做了些什么动作
func (c *Controller) Start(ctx context.Context) error {
        c.mu.Lock()
        if c.Started {
                return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
        }

        c.initMetrics()

        // Set the internal context.
        c.ctx = ctx

        c.Queue = c.MakeQueue() // 初始化queue
        go func() { // 退出时,让queue关闭
                <-ctx.Done()
                c.Queue.ShutDown()
        }()

        wg := &sync.WaitGroup{}
        err := func() error {
                defer c.mu.Unlock()
                defer utilruntime.HandleCrash()

                // 启动informer前,将之前准备好的 evnetHandle predictates source注册
                for _, watch := range c.startWatches {
                        c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
                                // 上面我们看过了,start就是真正的注册动作
                        if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
                                return err
                        }
                }

                // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
                c.LogConstructor(nil).Info("Starting Controller")
               // startWatches上面我们也看到了,是evnetHandle predictates source被缓存到里面,
      // 这里是拿出来将其启动
                for _, watch := range c.startWatches {
                        syncingSource, ok := watch.src.(source.SyncingSource)
                        if !ok {
                                continue
                        }

                        if err := func() error {
                                // use a context with timeout for launching sources and syncing caches.
                                sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
                                defer cancel()

                                // WaitForSync waits for a definitive timeout, and returns if there
                                // is an error or a timeout
                                if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
                                        err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
                                        c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
                                        return err
                                }

                                return nil
                        }(); err != nil {
                                return err
                        }
                }

                // which won't be garbage collected if we hold a reference to it.
                c.startWatches = nil

                // Launch workers to process resources
                c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
                wg.Add(c.MaxConcurrentReconciles)
      // 启动controller消费端的线程
                for i := 0; i < c.MaxConcurrentReconciles; i++ {
                        go func() {
                                defer wg.Done()
                                for c.processNextWorkItem(ctx) {
                                }
                        }()
                }

                c.Started = true
                return nil
        }()
        if err != nil {
                return err
        }

        <-ctx.Done() // 阻塞,直到上下文关闭
        c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
        wg.Wait() // 等待所有线程都关闭
        c.LogConstructor(nil).Info("All workers finished")
        return nil
}manager.New()
start Manager

接下来是manager的启动,也就是对应的 start() 与 doWatch()
通过下述代码我们可以看出来,对于 doWatch() 就是把 compete() 前的一些资源的事件函数都注入到controller 中
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
        obj, shutdown := c.Queue.Get() // 从队列中拿取数据
        if shutdown {
                return false
        }

        defer c.Queue.Done(obj)
        // 下面应该是prometheus指标的一些东西
        ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
        defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
        // 获得的对象通过reconcileHandler处理
        c.reconcileHandler(ctx, obj)
        return true
}由于前两部 builder 的操作将 mgr 指针传入到 builder中,并且操作了 complete() ,也就是操作了 build() ,这代表了对 controller 完成了初始化,和事件注入(watch)的操作,所以 Start(),就是将controller启动
func (cm *controllerManager) Start(ctx context.Context) (err error) {        cm.Lock()        if cm.started {                cm.Unlock()                return errors.New("manager already started")        }        var ready bool        defer func() {                if !ready {                        cm.Unlock()                }        }()        // Initialize the internal context.        cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)        // 这个channel代表了controller的停止        stopComplete := make(chan struct{})        defer close(stopComplete)        // This must be deferred after closing stopComplete, otherwise we deadlock.        defer func() {                stopErr := cm.engageStopProcedure(stopComplete)                if stopErr != nil {                        if err != nil {                                err = kerrors.NewAggregate([]error{err, stopErr})                        } else {                                err = stopErr                        }                }        }()        // Add the cluster runnable.        if err := cm.add(cm.cluster); err != nil {                return fmt.Errorf("failed to add cluster to runnables: %w", err)        }    // 指标类        if cm.metricsListener != nil {                cm.serveMetrics()        }        if cm.healthProbeListener != nil {                cm.serveHealthProbes()        }        if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {                if !errors.Is(err, wait.ErrWaitTimeout) {                        return err                }        }        // 等待informer同步完成        if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {                if !errors.Is(err, wait.ErrWaitTimeout) {                        return err                }        }        // 非选举模式,runnable将在cache同步完成后启动        if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {                if !errors.Is(err, wait.ErrWaitTimeout) {                        return err                }        }        // Start the leader election and all required runnables.        {                ctx, cancel := context.WithCancel(context.Background())                cm.leaderElectionCancel = cancel                go func() {                        if cm.resourceLock != nil {                                if err := cm.startLeaderElection(ctx); err != nil {                                        cm.errChan
页: [1]
查看完整版本: 深入解析kubernetes controller-runtime