深入解析kubernetes controller-runtime
Overviewcontroller-runtime 是 Kubernetes 社区提供可供快速搭建一套实现了controller 功能的工具,无需自行实现Controller的功能了;在 Kubebuilder与Operator SDK 也是使用 controller-runtime 。本文将对 controller-runtime 的工作原理以及在不同场景下的使用方式进行简要的总结和介绍。
controller-runtime structure
controller-runtime 主要组成是需要用户创建的 Manager 和 Reconciler 以及 Controller Runtime 自己启动的 Cache 和 Controller 。
[*]Manager:是用户在初始化时创建的,用于启动 Controller Runtime 组件
[*]Reconciler:是用户需要提供来处理自己的业务逻辑的组件(即在通过 code-generator 生成的api-like而实现的controller中的业务处理部分)。
[*]Cache:一个缓存,用来建立 Informer 到 ApiServer 的连接来监听资源并将被监听的对象推送到queue中。
[*]Controller: 一方面向 Informer 注册 eventHandler,另一方面从队列中获取数据。controller 将从队列中获取数据并执行用户自定义的 Reconciler 功能。
https://img2022.cnblogs.com/blog/1380340/202206/1380340-20220627221136307-385951010.png
图:controller-runtime structurehttps://img2022.cnblogs.com/blog/1380340/202206/1380340-20220627221151306-444229986.png
图:controller-runtime flowchart由图可知,Controller会向 Informer 注册一些列eventHandler;然后Cache启动Informer(informer属于cache包中),与ApiServer建立监听;当Informer检测到资源变化时,将对象加入queue,Controller 将元素取出并在用户端执行 Reconciler。
Controller引入
我们从 controller-rumtime项目的 example 进行引入看下,整个架构都是如何实现的。
可以看到 example 下的实际上实现了一个 reconciler的结构体,实现了 Reconciler 抽象和 Client 结构体
type reconciler struct {
client.Client
scheme *runtime.Scheme
}那么来看下 抽象的 Reconciler 是什么,可以看到就是抽象了 Reconcile 方法,这个是具体处理的逻辑过程
type Reconciler interface {
Reconcile(context.Context, Request) (Result, error)
}下面在看下谁来实现了这个 Reconciler 抽象
type Controller interface {
reconcile.Reconciler // 协调的具体步骤,通过ns/name\
// 通过predicates来评估来源数据,并加入queue中(放入队列的是reconcile.Requests)
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
// 启动controller,类似于自定义的Run()
Start(ctx context.Context) error
GetLogger() logr.Logger
}controller structure
在 controller-runtime\pkg\internal\controller\controller.go 中实现了这个 Controller
type Controller struct {
Name string // controller的标识
MaxConcurrentReconciles int // 并发运行Reconciler的数量,默认1
// 实现了reconcile.Reconciler的调节器, 默认DefaultReconcileFunc
Do reconcile.Reconciler
// makeQueue会构建一个对应的队列,就是返回一个限速队列
MakeQueue func() workqueue.RateLimitingInterface
// MakeQueue创造出来的,在出入队列就是操作的这个
Queue workqueue.RateLimitingInterface
// 用于注入其他内容
// 已弃用
SetFields func(i interface{}) error
mu sync.Mutex
// 标识开始的状态
Started bool
// 在启动时传递的上下文,用于停止控制器
ctx context.Context
// 等待缓存同步的时间 默认2分钟
CacheSyncTimeout time.Duration
// 维护了eventHandler predicates,在控制器启动时启动
startWatches []watchDescription
// 日志构建器,输出入日志
LogConstructor func(request *reconcile.Request) logr.Logger
// RecoverPanic为是否对reconcile引起的panic恢复
RecoverPanic bool
}看完了controller的structure,接下来看看controller是如何使用的
injection
Controller.Watch 实现了注入的动作,可以看到 watch() 通过参数将 对应的事件函数传入到内部
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
c.mu.Lock()
defer c.mu.Unlock()
// 使用SetFields来完成注入操作
if err := c.SetFields(src); err != nil {
return err
}
if err := c.SetFields(evthdler); err != nil {
return err
}
for _, pr := range prct {
if err := c.SetFields(pr); err != nil {
return err
}
}
// 如果Controller还未启动,那么将这些动作缓存到本地
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}启动操作实际上为informer注入事件函数
type Source interface {
// start 是Controller 调用,用以向 Informer 注册 EventHandler, 将 reconcile.Requests(一个入队列的动作) 排入队列。
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}
func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Informer should have been specified by the user.
if is.Informer == nil {
return fmt.Errorf("must specify Informer.Informer")
}
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}我们知道对于 eventHandler,实际上应该是一个 onAdd,onUpdate 这种类型的函数,queue则是workqueue,那么 Predicates 是什么呢?
通过追踪可以看到定义了 Predicate 抽象,可以看出Predicate 是Watch到的事件时什么类型的,当对于每个类型的事件,对应的函数就为 true,在 eventHandler 中,这些被用作,事件的过滤。
// Predicate filters events before enqueuing the keys.
type Predicate interface {
// Create returns true if the Create event should be processed
Create(event.CreateEvent) bool
// Delete returns true if the Delete event should be processed
Delete(event.DeleteEvent) bool
// Update returns true if the Update event should be processed
Update(event.UpdateEvent) bool
// Generic returns true if the Generic event should be processed
Generic(event.GenericEvent) bool
}在对应的动作中,可以看到这里作为过滤操作
func (e EventHandler) OnAdd(obj interface{}) {
c := event.CreateEvent{}
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
for _, p := range e.Predicates {
if !p.Create(c) {
return
}
}
// Invoke create handler
e.EventHandler.Create(c, e.Queue)
}上面就看到了,对应是 EventHandler.Create 进行添加的,那么这些动作具体是在做什么呢?
在代码 pkg/handler ,可以看到这些操作,类似于create,这里将ns/name放入到队列中。
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}unqueue
上面看到了,入队的动作实际上都是将 ns/name 加入到队列中,那么出队列时又做了些什么呢?
通过 controller.Start()可以看到controller在启动后都做了些什么动作
func (c *Controller) Start(ctx context.Context) error {
c.mu.Lock()
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}
c.initMetrics()
// Set the internal context.
c.ctx = ctx
c.Queue = c.MakeQueue() // 初始化queue
go func() { // 退出时,让queue关闭
<-ctx.Done()
c.Queue.ShutDown()
}()
wg := &sync.WaitGroup{}
err := func() error {
defer c.mu.Unlock()
defer utilruntime.HandleCrash()
// 启动informer前,将之前准备好的 evnetHandle predictates source注册
for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
// 上面我们看过了,start就是真正的注册动作
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.LogConstructor(nil).Info("Starting Controller")
// startWatches上面我们也看到了,是evnetHandle predictates source被缓存到里面,
// 这里是拿出来将其启动
for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := func() error {
// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
return err
}
return nil
}(); err != nil {
return err
}
}
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil
// Launch workers to process resources
c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
// 启动controller消费端的线程
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
defer wg.Done()
for c.processNextWorkItem(ctx) {
}
}()
}
c.Started = true
return nil
}()
if err != nil {
return err
}
<-ctx.Done() // 阻塞,直到上下文关闭
c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait() // 等待所有线程都关闭
c.LogConstructor(nil).Info("All workers finished")
return nil
}manager.New()
start Manager
接下来是manager的启动,也就是对应的 start() 与 doWatch()
通过下述代码我们可以看出来,对于 doWatch() 就是把 compete() 前的一些资源的事件函数都注入到controller 中
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get() // 从队列中拿取数据
if shutdown {
return false
}
defer c.Queue.Done(obj)
// 下面应该是prometheus指标的一些东西
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
// 获得的对象通过reconcileHandler处理
c.reconcileHandler(ctx, obj)
return true
}由于前两部 builder 的操作将 mgr 指针传入到 builder中,并且操作了 complete() ,也就是操作了 build() ,这代表了对 controller 完成了初始化,和事件注入(watch)的操作,所以 Start(),就是将controller启动
func (cm *controllerManager) Start(ctx context.Context) (err error) { cm.Lock() if cm.started { cm.Unlock() return errors.New("manager already started") } var ready bool defer func() { if !ready { cm.Unlock() } }() // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // 这个channel代表了controller的停止 stopComplete := make(chan struct{}) defer close(stopComplete) // This must be deferred after closing stopComplete, otherwise we deadlock. defer func() { stopErr := cm.engageStopProcedure(stopComplete) if stopErr != nil { if err != nil { err = kerrors.NewAggregate([]error{err, stopErr}) } else { err = stopErr } } }() // Add the cluster runnable. if err := cm.add(cm.cluster); err != nil { return fmt.Errorf("failed to add cluster to runnables: %w", err) } // 指标类 if cm.metricsListener != nil { cm.serveMetrics() } if cm.healthProbeListener != nil { cm.serveHealthProbes() } if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // 等待informer同步完成 if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // 非选举模式,runnable将在cache同步完成后启动 if err := cm.runnables.Others.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // Start the leader election and all required runnables. { ctx, cancel := context.WithCancel(context.Background()) cm.leaderElectionCancel = cancel go func() { if cm.resourceLock != nil { if err := cm.startLeaderElection(ctx); err != nil { cm.errChan
页:
[1]