Overview
controller-runtime 是 Kubernetes 社区提供可供快速搭建一套 实现了controller 功能的工具,无需自行实现Controller的功能了;在 Kubebuilder 与 Operator SDK 也是使用 controller-runtime 。本文将对 controller-runtime 的工作原理以及在不同场景下的使用方式进行简要的总结和介绍。
controller-runtime structure
controller-runtime 主要组成是需要用户创建的 Manager 和 Reconciler 以及 Controller Runtime 自己启动的 Cache 和 Controller 。
- Manager:是用户在初始化时创建的,用于启动 Controller Runtime 组件
- Reconciler:是用户需要提供来处理自己的业务逻辑的组件(即在通过 code-generator 生成的api-like而实现的controller中的业务处理部分)。
- Cache:一个缓存,用来建立 Informer 到 ApiServer 的连接来监听资源并将被监听的对象推送到queue中。
- Controller: 一方面向 Informer 注册 eventHandler,另一方面从队列中获取数据。controller 将从队列中获取数据并执行用户自定义的 Reconciler 功能。
图:controller-runtime structure
图:controller-runtime flowchart由图可知,Controller会向 Informer 注册一些列eventHandler;然后Cache启动Informer(informer属于cache包中),与ApiServer建立监听;当Informer检测到资源变化时,将对象加入queue,Controller 将元素取出并在用户端执行 Reconciler。
Controller引入
我们从 controller-rumtime项目的 example 进行引入看下,整个架构都是如何实现的。
可以看到 example 下的实际上实现了一个 reconciler 的结构体,实现了 Reconciler 抽象和 Client 结构体- type reconciler struct {
- client.Client
- scheme *runtime.Scheme
- }
复制代码 那么来看下 抽象的 Reconciler 是什么,可以看到就是抽象了 Reconcile 方法,这个是具体处理的逻辑过程- type Reconciler interface {
- Reconcile(context.Context, Request) (Result, error)
- }
复制代码 下面在看下谁来实现了这个 Reconciler 抽象- type Controller interface {
- reconcile.Reconciler // 协调的具体步骤,通过ns/name\
- // 通过predicates来评估来源数据,并加入queue中(放入队列的是reconcile.Requests)
- Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
- // 启动controller,类似于自定义的Run()
- Start(ctx context.Context) error
- GetLogger() logr.Logger
- }
复制代码 controller structure
在 controller-runtime\pkg\internal\controller\controller.go 中实现了这个 Controller- type Controller struct {
- Name string // controller的标识
-
- MaxConcurrentReconciles int // 并发运行Reconciler的数量,默认1
- // 实现了reconcile.Reconciler的调节器, 默认DefaultReconcileFunc
- Do reconcile.Reconciler
- // makeQueue会构建一个对应的队列,就是返回一个限速队列
- MakeQueue func() workqueue.RateLimitingInterface
- // MakeQueue创造出来的,在出入队列就是操作的这个
- Queue workqueue.RateLimitingInterface
- // 用于注入其他内容
- // 已弃用
- SetFields func(i interface{}) error
- mu sync.Mutex
- // 标识开始的状态
- Started bool
- // 在启动时传递的上下文,用于停止控制器
- ctx context.Context
- // 等待缓存同步的时间 默认2分钟
- CacheSyncTimeout time.Duration
- // 维护了eventHandler predicates,在控制器启动时启动
- startWatches []watchDescription
- // 日志构建器,输出入日志
- LogConstructor func(request *reconcile.Request) logr.Logger
- // RecoverPanic为是否对reconcile引起的panic恢复
- RecoverPanic bool
- }
复制代码 看完了controller的structure,接下来看看controller是如何使用的
injection
Controller.Watch 实现了注入的动作,可以看到 watch() 通过参数将 对应的事件函数传入到内部- func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- // 使用SetFields来完成注入操作
- if err := c.SetFields(src); err != nil {
- return err
- }
- if err := c.SetFields(evthdler); err != nil {
- return err
- }
- for _, pr := range prct {
- if err := c.SetFields(pr); err != nil {
- return err
- }
- }
- // 如果Controller还未启动,那么将这些动作缓存到本地
- if !c.Started {
- c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
- return nil
- }
- c.LogConstructor(nil).Info("Starting EventSource", "source", src)
- return src.Start(c.ctx, evthdler, c.Queue, prct...)
- }
复制代码 启动操作实际上为informer注入事件函数- type Source interface {
- // start 是Controller 调用,用以向 Informer 注册 EventHandler, 将 reconcile.Requests(一个入队列的动作) 排入队列。
- Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
- }
- func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
- prct ...predicate.Predicate) error {
- // Informer should have been specified by the user.
- if is.Informer == nil {
- return fmt.Errorf("must specify Informer.Informer")
- }
- is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
- return nil
- }
复制代码 我们知道对于 eventHandler,实际上应该是一个 onAdd,onUpdate 这种类型的函数,queue则是workqueue,那么 Predicates 是什么呢?
通过追踪可以看到定义了 Predicate 抽象,可以看出Predicate 是Watch到的事件时什么类型的,当对于每个类型的事件,对应的函数就为 true,在 eventHandler 中,这些被用作,事件的过滤。- // Predicate filters events before enqueuing the keys.
- type Predicate interface {
- // Create returns true if the Create event should be processed
- Create(event.CreateEvent) bool
- // Delete returns true if the Delete event should be processed
- Delete(event.DeleteEvent) bool
- // Update returns true if the Update event should be processed
- Update(event.UpdateEvent) bool
- // Generic returns true if the Generic event should be processed
- Generic(event.GenericEvent) bool
- }
复制代码 在对应的动作中,可以看到这里作为过滤操作- func (e EventHandler) OnAdd(obj interface{}) {
- c := event.CreateEvent{}
- // Pull Object out of the object
- if o, ok := obj.(client.Object); ok {
- c.Object = o
- } else {
- log.Error(nil, "OnAdd missing Object",
- "object", obj, "type", fmt.Sprintf("%T", obj))
- return
- }
- for _, p := range e.Predicates {
- if !p.Create(c) {
- return
- }
- }
- // Invoke create handler
- e.EventHandler.Create(c, e.Queue)
- }
复制代码 上面就看到了,对应是 EventHandler.Create 进行添加的,那么这些动作具体是在做什么呢?
在代码 pkg/handler ,可以看到这些操作,类似于create,这里将ns/name放入到队列中。- func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
- if evt.Object == nil {
- enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
- return
- }
- q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
- Name: evt.Object.GetName(),
- Namespace: evt.Object.GetNamespace(),
- }})
- }
复制代码 unqueue
上面看到了,入队的动作实际上都是将 ns/name 加入到队列中,那么出队列时又做了些什么呢?
通过 controller.Start() 可以看到controller在启动后都做了些什么动作- func (c *Controller) Start(ctx context.Context) error {
- c.mu.Lock()
- if c.Started {
- return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
- }
- c.initMetrics()
- // Set the internal context.
- c.ctx = ctx
- c.Queue = c.MakeQueue() // 初始化queue
- go func() { // 退出时,让queue关闭
- <-ctx.Done()
- c.Queue.ShutDown()
- }()
- wg := &sync.WaitGroup{}
- err := func() error {
- defer c.mu.Unlock()
- defer utilruntime.HandleCrash()
- // 启动informer前,将之前准备好的 evnetHandle predictates source注册
- for _, watch := range c.startWatches {
- c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
- // 上面我们看过了,start就是真正的注册动作
- if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
- return err
- }
- }
- // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
- c.LogConstructor(nil).Info("Starting Controller")
- // startWatches上面我们也看到了,是evnetHandle predictates source被缓存到里面,
- // 这里是拿出来将其启动
- for _, watch := range c.startWatches {
- syncingSource, ok := watch.src.(source.SyncingSource)
- if !ok {
- continue
- }
- if err := func() error {
- // use a context with timeout for launching sources and syncing caches.
- sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
- defer cancel()
- // WaitForSync waits for a definitive timeout, and returns if there
- // is an error or a timeout
- if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
- err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
- c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
- return err
- }
- return nil
- }(); err != nil {
- return err
- }
- }
- // which won't be garbage collected if we hold a reference to it.
- c.startWatches = nil
- // Launch workers to process resources
- c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
- wg.Add(c.MaxConcurrentReconciles)
- // 启动controller消费端的线程
- for i := 0; i < c.MaxConcurrentReconciles; i++ {
- go func() {
- defer wg.Done()
- for c.processNextWorkItem(ctx) {
- }
- }()
- }
- c.Started = true
- return nil
- }()
- if err != nil {
- return err
- }
- <-ctx.Done() // 阻塞,直到上下文关闭
- c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
- wg.Wait() // 等待所有线程都关闭
- c.LogConstructor(nil).Info("All workers finished")
- return nil
- }
复制代码 manager.New()
start Manager
接下来是manager的启动,也就是对应的 start() 与 doWatch()
通过下述代码我们可以看出来,对于 doWatch() 就是把 compete() 前的一些资源的事件函数都注入到controller 中- func (c *Controller) processNextWorkItem(ctx context.Context) bool {
- obj, shutdown := c.Queue.Get() // 从队列中拿取数据
- if shutdown {
- return false
- }
- defer c.Queue.Done(obj)
- // 下面应该是prometheus指标的一些东西
- ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
- defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
- // 获得的对象通过reconcileHandler处理
- c.reconcileHandler(ctx, obj)
- return true
- }
复制代码 由于前两部 builder 的操作将 mgr 指针传入到 builder中,并且操作了 complete() ,也就是操作了 build() ,这代表了对 controller 完成了初始化,和事件注入(watch)的操作,所以 Start(),就是将controller启动
[code]func (cm *controllerManager) Start(ctx context.Context) (err error) { cm.Lock() if cm.started { cm.Unlock() return errors.New("manager already started") } var ready bool defer func() { if !ready { cm.Unlock() } }() // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // 这个channel代表了controller的停止 stopComplete := make(chan struct{}) defer close(stopComplete) // This must be deferred after closing stopComplete, otherwise we deadlock. defer func() { stopErr := cm.engageStopProcedure(stopComplete) if stopErr != nil { if err != nil { err = kerrors.NewAggregate([]error{err, stopErr}) } else { err = stopErr } } }() // Add the cluster runnable. if err := cm.add(cm.cluster); err != nil { return fmt.Errorf("failed to add cluster to runnables: %w", err) } // 指标类 if cm.metricsListener != nil { cm.serveMetrics() } if cm.healthProbeListener != nil { cm.serveHealthProbes() } if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // 等待informer同步完成 if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // 非选举模式,runnable将在cache同步完成后启动 if err := cm.runnables.Others.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // Start the leader election and all required runnables. { ctx, cancel := context.WithCancel(context.Background()) cm.leaderElectionCancel = cancel go func() { if cm.resourceLock != nil { if err := cm.startLeaderElection(ctx); err != nil { cm.errChan |