农民 发表于 2023-12-9 13:59:16

kubelet 原理分析

kubelet 简介

kubernetes 分为控制面和数据面,kubelet 就是数据面最主要的组件,在每个节点上启动,主要负责容器的创建、启停、监控、日志收集等工作。它是一个在每个集群节点上运行的代理,负责确保节点上的容器根据PodSpec(Pod定义文件)正确运行。
Kubelet执行以下几项重要功能:

[*]Pod生命周期管理:Kubelet根据从API服务器接收到的PodSpecs创建、启动、终止容器。它负责启动Pod中的容器,并确保它们按预期运行。
[*]节点状态监控:Kubelet定期监控节点和容器的状态,并将状态报告回集群的控制平面。这使得集群中的其他组件能够做出相应的调度决策。
[*]资源管理:Kubelet负责管理分配给每个Pod的资源。这包括CPU、内存和磁盘存储资源。
[*]健康检查:Kubelet可以执行容器健康检查,并根据检查结果决定是否需要重启容器。
[*]与容器运行时的通信:Kubelet与容器运行时(如Docker、containerd等)通信,以管理容器的生命周期。
[*]秘密和配置管理:Kubelet负责将秘密、配置映射等挂载到Pod的容器中,以便应用程序可以访问这些配置。
[*]服务发现和负载均衡:尽管Kubelet本身不直接处理服务发现,但它通过设置网络规则和环境变量来支持容器内的服务发现机制。
kubelet 架构

https://img2023.cnblogs.com/blog/2344773/202311/2344773-20231107221805894-125360121.png
kubelet 的架构由 N 多的组件组成,下面简单介绍下比较重要的几个:

[*]Sync Loop: 这是Kubelet活动的核心,负责同步Pod的状态。同步循环会定期从API服务器获取PodSpecs,并确保容器的当前状态与这些规格相匹配。
[*]PodConfig: 负责将各个配置源转换成 PodSpecs,可以选择的配置源包括:Kube-apiserver、本地文件、HTTP。
[*]PLEG(Pod Lifecycle Event Generator): 负责监测和缓存Pod生命周期事件,如创建、启动或停止容器,然后将这些事件通知 Sync Loop。
[*]PodWorkers: 负责管理 Pod 的生命周期事件处理。当 Pod 生命周期事件 PLEG 检测到新的事件时,PodWorkers 会被调用来处理这些事件,包括启动新的 Pod、更新现有的 Pod、或者停止和清理不再需要的 Pod。
[*]PodManager: 存储 Pod 的期望状态,kubelet 服务的不同渠道的 Pod。
[*]ContainerRuntime: 顾名思义,容器运行时。与遵循 CRI 规范的高级容器运行时进行交互。
[*]StatsProvider: 提供节点和容器的统计信息,有 cAdvisor 和 CRI 两种实现。
[*]ProbeManager: 负责执行容器的健康检查,包括 Liveness,Startup 和 Readiness 检查。
[*]VolumeManager: 负责管理 Pod 的卷,包括挂载和卸载卷。
[*]ImageManager: 负责管理镜像,包括拉取、删除、镜像 GC 等。
[*]DeviceManager: 负责管理设备,包括 GPU、RDMA 等。
[*]PluginManager: PluginManager 运行一组异步循环,根据此节点确定哪些插件需要注册/取消注册并执行。如 CSI 驱动和设备管理器插件(Device Plugin)。
[*]CertificateManager: 处理证书轮换。
[*]OOMWatcher: 从系统日志中获取容器的 OOM 日志,将其封装成事件并记录。
流程

首先在 cmd/kubelet 中使用传入命令行参数的方式初始化配置,然后创建 pkg/kubelet 中的 Bootstrap inferface, kubelet struct 实现了这个接口, 然后调用 Run 方法启动 kubelet。
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
        // start the kubelet
        go k.Run(podCfg.Updates())

        // start the kubelet server
        if enableServer {
                go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
        }
        if kubeCfg.ReadOnlyPort > 0 {
                go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
        }
        go k.ListenAndServePodResources()
}Bootstrap

// Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol
type Bootstrap interface {
        GetConfiguration() kubeletconfiginternal.KubeletConfiguration
        BirthCry()
        StartGarbageCollection()
        ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider)
        ListenAndServeReadOnly(address net.IP, port uint)
        ListenAndServePodResources()
        Run(<-chan kubetypes.PodUpdate)
        RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}

type Kubelet struct {
    // ...
}首先解释一下这个函数的参数:

[*]configCh: 将配置更改的 Pod 分派给适当的处理程序回调函数
[*]plegCh: 更新运行时缓存;同步 Pod
[*]syncCh: 同步所有等待同步的 Pod
[*]housekeepingCh: 触发 Pod 的清理
[*]health manager: 同步失败的 Pod 或其中一个或多个容器的健康检查失败的 Pod
代码流程为:

[*]如果 updates channel 有消息 则使用 handler 调用对应方法做处理
[*]如果 plegCh 有消息 则使用 handler 的 HandlePodSyncs 做同步
[*]如果 syncCh 有消息 代表到了同步时间 做同步操作
[*]如果是 三种 probe 的更新 则使用 handleProbeSync 做同步
[*]如果 housekeepingCh 有消息 则使用 handler 的 HandlePodCleanups 做清理
func (kl *Kubelet) StartGarbageCollection() {
        loggedContainerGCFailure := false
        go wait.Until(func() {
                ctx := context.Background()
                if err := kl.containerGC.GarbageCollect(ctx); err != nil {
                        klog.ErrorS(err, "Container garbage collection failed")
                        kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
                        loggedContainerGCFailure = true
                } else {
                        var vLevel klog.Level = 4
                        if loggedContainerGCFailure {
                                vLevel = 1
                                loggedContainerGCFailure = false
                        }

                        klog.V(vLevel).InfoS("Container garbage collection succeeded")
                }
        }, ContainerGCPeriod, wait.NeverStop)

        // when the high threshold is set to 100, stub the image GC manager
        if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
                klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100, Disable image GC")
                return
        }

        prevImageGCFailed := false
        go wait.Until(func() {
                ctx := context.Background()
                if err := kl.imageManager.GarbageCollect(ctx); err != nil {
                        if prevImageGCFailed {
                                klog.ErrorS(err, "Image garbage collection failed multiple times in a row")
                                // Only create an event for repeated failures
                                kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
                        } else {
                                klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
                        }
                        prevImageGCFailed = true
                } else {
                        var vLevel klog.Level = 4
                        if prevImageGCFailed {
                                vLevel = 1
                                prevImageGCFailed = false
                        }

                        klog.V(vLevel).InfoS("Image garbage collection succeeded")
                }
        }, ImageGCPeriod, wait.NeverStop)
}handleProbeSync也是使用 handler 的 HandlePodSyncs 做同步
handle (SyncHandler)

// RunOnce polls from one configuration update and run the associated pods.
func (kl *Kubelet) RunOnce(updates <-chan kubetypes.PodUpdate) ([]RunPodResult, error) {
        ctx := context.Background()
        // Setup filesystem directories.
        if err := kl.setupDataDirs(); err != nil {
                return nil, err
        }

        // If the container logs directory does not exist, create it.
        if _, err := os.Stat(ContainerLogsDir); err != nil {
                if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
                        klog.ErrorS(err, "Failed to create directory", "path", ContainerLogsDir)
                }
        }

        select {
        case u := <-updates:
                klog.InfoS("Processing manifest with pods", "numPods", len(u.Pods))
                result, err := kl.runOnce(ctx, u.Pods, runOnceRetryDelay)
                klog.InfoS("Finished processing pods", "numPods", len(u.Pods))
                return result, err
        case <-time.After(runOnceManifestDelay):
                return nil, fmt.Errorf("no pod manifest update after %v", runOnceManifestDelay)
        }
}

// runOnce runs a given set of pods and returns their status.
func (kl *Kubelet) runOnce(ctx context.Context, pods []*v1.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
        ch := make(chan RunPodResult)
        admitted := []*v1.Pod{}
        for _, pod := range pods {
                // Check if we can admit the pod.
                if ok, reason, message := kl.canAdmitPod(admitted, pod); !ok {
                        kl.rejectPod(pod, reason, message)
                        results = append(results, RunPodResult{pod, nil})
                        continue
                }

                admitted = append(admitted, pod)
                go func(pod *v1.Pod) {
                        err := kl.runPod(ctx, pod, retryDelay)
                        ch <- RunPodResult{pod, err}
                }(pod)
        }

        klog.InfoS("Waiting for pods", "numPods", len(admitted))
        failedPods := []string{}
        for i := 0; i < len(admitted); i++ {
                res := <-ch
                results = append(results, res)
                if res.Err != nil {
                        failedContainerName, err := kl.getFailedContainers(ctx, res.Pod)
                        if err != nil {
                                klog.InfoS("Unable to get failed containers' names for pod", "pod", klog.KObj(res.Pod), "err", err)
                        } else {
                                klog.InfoS("Unable to start pod because container failed", "pod", klog.KObj(res.Pod), "containerName", failedContainerName)
                        }
                        failedPods = append(failedPods, format.Pod(res.Pod))
                } else {
                        klog.InfoS("Started pod", "pod", klog.KObj(res.Pod))
                }
        }
        if len(failedPods) > 0 {
                return results, fmt.Errorf("error running pods: %v", failedPods)
        }
        klog.InfoS("Pods started", "numPods", len(pods))
        return results, err
}也是 kubelet struct 实现了这个接口
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
        ctx := context.Background()
        if kl.logServer == nil {
                file := http.FileServer(http.Dir(nodeLogDir))
                if utilfeature.DefaultFeatureGate.Enabled(features.NodeLogQuery) && kl.kubeletConfiguration.EnableSystemLogQuery {
                        kl.logServer = http.StripPrefix("/logs/", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
                                if nlq, errs := newNodeLogQuery(req.URL.Query()); len(errs) > 0 {
                                        http.Error(w, errs.ToAggregate().Error(), http.StatusBadRequest)
                                        return
                                } else if nlq != nil {
                                        if req.URL.Path != "/" && req.URL.Path != "" {
                                                http.Error(w, "path not allowed in query mode", http.StatusNotAcceptable)
                                                return
                                        }
                                        if errs := nlq.validate(); len(errs) > 0 {
                                                http.Error(w, errs.ToAggregate().Error(), http.StatusNotAcceptable)
                                                return
                                        }
                                        // Validation ensures that the request does not query services and files at the same time
                                        if len(nlq.Services) > 0 {
                                                journal.ServeHTTP(w, req)
                                                return
                                        }
                                        // Validation ensures that the request does not explicitly query multiple files at the same time
                                        if len(nlq.Files) == 1 {
                                                // Account for the \ being used on Windows clients
                                                req.URL.Path = filepath.ToSlash(nlq.Files)
                                        }
                                }
                                // Fall back in case the caller is directly trying to query a file
                                // Example: kubectl get --raw /api/v1/nodes/$name/proxy/logs/foo.log
                                file.ServeHTTP(w, req)
                        }))
                } else {
                        kl.logServer = http.StripPrefix("/logs/", file)
                }
        }
        if kl.kubeClient == nil {
                klog.InfoS("No API server defined - no node status update will be sent")
        }

        // Start the cloud provider sync manager
        if kl.cloudResourceSyncManager != nil {
                go kl.cloudResourceSyncManager.Run(wait.NeverStop)
        }

        if err := kl.initializeModules(); err != nil {
                kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
                klog.ErrorS(err, "Failed to initialize internal modules")
                os.Exit(1)
        }

        // Start volume manager
        go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

        if kl.kubeClient != nil {
                // Start two go-routines to update the status.
                //
                // The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
                // while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
                // once the node becomes ready, then exits afterwards.
                //
                // Introduce some small jittering to ensure that over time the requests won't start
                // accumulating at approximately the same time from the set of nodes due to priority and
                // fairness effect.
                go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
                go kl.fastStatusUpdateOnce()

                // start syncing lease
                go kl.nodeLeaseController.Run(context.Background())
        }
        go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

        // Set up iptables util rules
        if kl.makeIPTablesUtilChains {
                kl.initNetworkUtil()
        }

        // Start component sync loops.
        kl.statusManager.Start()

        // Start syncing RuntimeClasses if enabled.
        if kl.runtimeClassManager != nil {
                kl.runtimeClassManager.Start(wait.NeverStop)
        }

        // Start the pod lifecycle event generator.
        kl.pleg.Start()

        // Start eventedPLEG only if EventedPLEG feature gate is enabled.
        if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
                kl.eventedPleg.Start()
        }

        kl.syncLoop(ctx, updates, kl)
}可以看到这些函数基本都是把pod交给 podWorkers 去处理
podconfig

上文中的 updates channel 是从 podconfig 中获取的 那就来看看 podconfig 是怎么工作的
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
        klog.InfoS("Starting kubelet main sync loop")
        // The syncTicker wakes up kubelet to checks if there are any pod workers
        // that need to be sync'd. A one-second period is sufficient because the
        // sync interval is defaulted to 10s.
        syncTicker := time.NewTicker(time.Second)
        defer syncTicker.Stop()
        housekeepingTicker := time.NewTicker(housekeepingPeriod)
        defer housekeepingTicker.Stop()
        plegCh := kl.pleg.Watch()
        const (
                base   = 100 * time.Millisecond
                max    = 5 * time.Second
                factor = 2
        )
        duration := base
        // Responsible for checking limits in resolv.conf
        // The limits do not have anything to do with individual pods
        // Since this is called in syncLoop, we don't need to call it anywhere else
        if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
                kl.dnsConfigurer.CheckLimitsForResolvConf()
        }

        for {
                if err := kl.runtimeState.runtimeErrors(); err != nil {
                        klog.ErrorS(err, "Skipping pod synchronization")
                        // exponential backoff
                        time.Sleep(duration)
                        duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
                        continue
                }
                // reset backoff if we have a success
                duration = base

                kl.syncLoopMonitor.Store(kl.clock.Now())
                if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
                        break
                }
                kl.syncLoopMonitor.Store(kl.clock.Now())
        }
}这里定义了接口 接口提中可以存储多个 source 那么有什么 source 呢
kube-apiserver

func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan
页: [1]
查看完整版本: kubelet 原理分析