kubelet 简介
kubernetes 分为控制面和数据面,kubelet 就是数据面最主要的组件,在每个节点上启动,主要负责容器的创建、启停、监控、日志收集等工作。它是一个在每个集群节点上运行的代理,负责确保节点上的容器根据PodSpec(Pod定义文件)正确运行。
Kubelet执行以下几项重要功能:
- Pod生命周期管理:Kubelet根据从API服务器接收到的PodSpecs创建、启动、终止容器。它负责启动Pod中的容器,并确保它们按预期运行。
- 节点状态监控:Kubelet定期监控节点和容器的状态,并将状态报告回集群的控制平面。这使得集群中的其他组件能够做出相应的调度决策。
- 资源管理:Kubelet负责管理分配给每个Pod的资源。这包括CPU、内存和磁盘存储资源。
- 健康检查:Kubelet可以执行容器健康检查,并根据检查结果决定是否需要重启容器。
- 与容器运行时的通信:Kubelet与容器运行时(如Docker、containerd等)通信,以管理容器的生命周期。
- 秘密和配置管理:Kubelet负责将秘密、配置映射等挂载到Pod的容器中,以便应用程序可以访问这些配置。
- 服务发现和负载均衡:尽管Kubelet本身不直接处理服务发现,但它通过设置网络规则和环境变量来支持容器内的服务发现机制。
kubelet 架构

kubelet 的架构由 N 多的组件组成,下面简单介绍下比较重要的几个:
- Sync Loop: 这是Kubelet活动的核心,负责同步Pod的状态。同步循环会定期从API服务器获取PodSpecs,并确保容器的当前状态与这些规格相匹配。
- PodConfig: 负责将各个配置源转换成 PodSpecs,可以选择的配置源包括:Kube-apiserver、本地文件、HTTP。
- PLEG(Pod Lifecycle Event Generator): 负责监测和缓存Pod生命周期事件,如创建、启动或停止容器,然后将这些事件通知 Sync Loop。
- PodWorkers: 负责管理 Pod 的生命周期事件处理。当 Pod 生命周期事件 PLEG 检测到新的事件时,PodWorkers 会被调用来处理这些事件,包括启动新的 Pod、更新现有的 Pod、或者停止和清理不再需要的 Pod。
- PodManager: 存储 Pod 的期望状态,kubelet 服务的不同渠道的 Pod。
- ContainerRuntime: 顾名思义,容器运行时。与遵循 CRI 规范的高级容器运行时进行交互。
- StatsProvider: 提供节点和容器的统计信息,有 cAdvisor 和 CRI 两种实现。
- ProbeManager: 负责执行容器的健康检查,包括 Liveness,Startup 和 Readiness 检查。
- VolumeManager: 负责管理 Pod 的卷,包括挂载和卸载卷。
- ImageManager: 负责管理镜像,包括拉取、删除、镜像 GC 等。
- DeviceManager: 负责管理设备,包括 GPU、RDMA 等。
- PluginManager: PluginManager 运行一组异步循环,根据此节点确定哪些插件需要注册/取消注册并执行。如 CSI 驱动和设备管理器插件(Device Plugin)。
- CertificateManager: 处理证书轮换。
- OOMWatcher: 从系统日志中获取容器的 OOM 日志,将其封装成事件并记录。
流程
首先在 cmd/kubelet 中使用传入命令行参数的方式初始化配置,然后创建 pkg/kubelet 中的 Bootstrap inferface, kubelet struct 实现了这个接口, 然后调用 Run 方法启动 kubelet。- func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
- // start the kubelet
- go k.Run(podCfg.Updates())
- // start the kubelet server
- if enableServer {
- go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
- }
- if kubeCfg.ReadOnlyPort > 0 {
- go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
- }
- go k.ListenAndServePodResources()
- }
复制代码 Bootstrap
- // Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol
- type Bootstrap interface {
- GetConfiguration() kubeletconfiginternal.KubeletConfiguration
- BirthCry()
- StartGarbageCollection()
- ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider)
- ListenAndServeReadOnly(address net.IP, port uint)
- ListenAndServePodResources()
- Run(<-chan kubetypes.PodUpdate)
- RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
- }
- type Kubelet struct {
- // ...
- }
复制代码 首先解释一下这个函数的参数:
- configCh: 将配置更改的 Pod 分派给适当的处理程序回调函数
- plegCh: 更新运行时缓存;同步 Pod
- syncCh: 同步所有等待同步的 Pod
- housekeepingCh: 触发 Pod 的清理
- health manager: 同步失败的 Pod 或其中一个或多个容器的健康检查失败的 Pod
代码流程为:
- 如果 updates channel 有消息 则使用 handler 调用对应方法做处理
- 如果 plegCh 有消息 则使用 handler 的 HandlePodSyncs 做同步
- 如果 syncCh 有消息 代表到了同步时间 做同步操作
- 如果是 三种 probe 的更新 则使用 handleProbeSync 做同步
- 如果 housekeepingCh 有消息 则使用 handler 的 HandlePodCleanups 做清理
- func (kl *Kubelet) StartGarbageCollection() {
- loggedContainerGCFailure := false
- go wait.Until(func() {
- ctx := context.Background()
- if err := kl.containerGC.GarbageCollect(ctx); err != nil {
- klog.ErrorS(err, "Container garbage collection failed")
- kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
- loggedContainerGCFailure = true
- } else {
- var vLevel klog.Level = 4
- if loggedContainerGCFailure {
- vLevel = 1
- loggedContainerGCFailure = false
- }
- klog.V(vLevel).InfoS("Container garbage collection succeeded")
- }
- }, ContainerGCPeriod, wait.NeverStop)
- // when the high threshold is set to 100, stub the image GC manager
- if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
- klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100, Disable image GC")
- return
- }
- prevImageGCFailed := false
- go wait.Until(func() {
- ctx := context.Background()
- if err := kl.imageManager.GarbageCollect(ctx); err != nil {
- if prevImageGCFailed {
- klog.ErrorS(err, "Image garbage collection failed multiple times in a row")
- // Only create an event for repeated failures
- kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
- } else {
- klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
- }
- prevImageGCFailed = true
- } else {
- var vLevel klog.Level = 4
- if prevImageGCFailed {
- vLevel = 1
- prevImageGCFailed = false
- }
- klog.V(vLevel).InfoS("Image garbage collection succeeded")
- }
- }, ImageGCPeriod, wait.NeverStop)
- }
复制代码 handleProbeSync也是使用 handler 的 HandlePodSyncs 做同步
handle (SyncHandler)
- // RunOnce polls from one configuration update and run the associated pods.
- func (kl *Kubelet) RunOnce(updates <-chan kubetypes.PodUpdate) ([]RunPodResult, error) {
- ctx := context.Background()
- // Setup filesystem directories.
- if err := kl.setupDataDirs(); err != nil {
- return nil, err
- }
- // If the container logs directory does not exist, create it.
- if _, err := os.Stat(ContainerLogsDir); err != nil {
- if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
- klog.ErrorS(err, "Failed to create directory", "path", ContainerLogsDir)
- }
- }
- select {
- case u := <-updates:
- klog.InfoS("Processing manifest with pods", "numPods", len(u.Pods))
- result, err := kl.runOnce(ctx, u.Pods, runOnceRetryDelay)
- klog.InfoS("Finished processing pods", "numPods", len(u.Pods))
- return result, err
- case <-time.After(runOnceManifestDelay):
- return nil, fmt.Errorf("no pod manifest update after %v", runOnceManifestDelay)
- }
- }
- // runOnce runs a given set of pods and returns their status.
- func (kl *Kubelet) runOnce(ctx context.Context, pods []*v1.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
- ch := make(chan RunPodResult)
- admitted := []*v1.Pod{}
- for _, pod := range pods {
- // Check if we can admit the pod.
- if ok, reason, message := kl.canAdmitPod(admitted, pod); !ok {
- kl.rejectPod(pod, reason, message)
- results = append(results, RunPodResult{pod, nil})
- continue
- }
- admitted = append(admitted, pod)
- go func(pod *v1.Pod) {
- err := kl.runPod(ctx, pod, retryDelay)
- ch <- RunPodResult{pod, err}
- }(pod)
- }
- klog.InfoS("Waiting for pods", "numPods", len(admitted))
- failedPods := []string{}
- for i := 0; i < len(admitted); i++ {
- res := <-ch
- results = append(results, res)
- if res.Err != nil {
- failedContainerName, err := kl.getFailedContainers(ctx, res.Pod)
- if err != nil {
- klog.InfoS("Unable to get failed containers' names for pod", "pod", klog.KObj(res.Pod), "err", err)
- } else {
- klog.InfoS("Unable to start pod because container failed", "pod", klog.KObj(res.Pod), "containerName", failedContainerName)
- }
- failedPods = append(failedPods, format.Pod(res.Pod))
- } else {
- klog.InfoS("Started pod", "pod", klog.KObj(res.Pod))
- }
- }
- if len(failedPods) > 0 {
- return results, fmt.Errorf("error running pods: %v", failedPods)
- }
- klog.InfoS("Pods started", "numPods", len(pods))
- return results, err
- }
复制代码 也是 kubelet struct 实现了这个接口- func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
- ctx := context.Background()
- if kl.logServer == nil {
- file := http.FileServer(http.Dir(nodeLogDir))
- if utilfeature.DefaultFeatureGate.Enabled(features.NodeLogQuery) && kl.kubeletConfiguration.EnableSystemLogQuery {
- kl.logServer = http.StripPrefix("/logs/", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- if nlq, errs := newNodeLogQuery(req.URL.Query()); len(errs) > 0 {
- http.Error(w, errs.ToAggregate().Error(), http.StatusBadRequest)
- return
- } else if nlq != nil {
- if req.URL.Path != "/" && req.URL.Path != "" {
- http.Error(w, "path not allowed in query mode", http.StatusNotAcceptable)
- return
- }
- if errs := nlq.validate(); len(errs) > 0 {
- http.Error(w, errs.ToAggregate().Error(), http.StatusNotAcceptable)
- return
- }
- // Validation ensures that the request does not query services and files at the same time
- if len(nlq.Services) > 0 {
- journal.ServeHTTP(w, req)
- return
- }
- // Validation ensures that the request does not explicitly query multiple files at the same time
- if len(nlq.Files) == 1 {
- // Account for the \ being used on Windows clients
- req.URL.Path = filepath.ToSlash(nlq.Files[0])
- }
- }
- // Fall back in case the caller is directly trying to query a file
- // Example: kubectl get --raw /api/v1/nodes/$name/proxy/logs/foo.log
- file.ServeHTTP(w, req)
- }))
- } else {
- kl.logServer = http.StripPrefix("/logs/", file)
- }
- }
- if kl.kubeClient == nil {
- klog.InfoS("No API server defined - no node status update will be sent")
- }
- // Start the cloud provider sync manager
- if kl.cloudResourceSyncManager != nil {
- go kl.cloudResourceSyncManager.Run(wait.NeverStop)
- }
- if err := kl.initializeModules(); err != nil {
- kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
- klog.ErrorS(err, "Failed to initialize internal modules")
- os.Exit(1)
- }
- // Start volume manager
- go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
- if kl.kubeClient != nil {
- // Start two go-routines to update the status.
- //
- // The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
- // while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
- // once the node becomes ready, then exits afterwards.
- //
- // Introduce some small jittering to ensure that over time the requests won't start
- // accumulating at approximately the same time from the set of nodes due to priority and
- // fairness effect.
- go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
- go kl.fastStatusUpdateOnce()
- // start syncing lease
- go kl.nodeLeaseController.Run(context.Background())
- }
- go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
- // Set up iptables util rules
- if kl.makeIPTablesUtilChains {
- kl.initNetworkUtil()
- }
- // Start component sync loops.
- kl.statusManager.Start()
- // Start syncing RuntimeClasses if enabled.
- if kl.runtimeClassManager != nil {
- kl.runtimeClassManager.Start(wait.NeverStop)
- }
- // Start the pod lifecycle event generator.
- kl.pleg.Start()
- // Start eventedPLEG only if EventedPLEG feature gate is enabled.
- if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
- kl.eventedPleg.Start()
- }
- kl.syncLoop(ctx, updates, kl)
- }
复制代码 可以看到这些函数基本都是把pod交给 podWorkers 去处理
podconfig
上文中的 updates channel 是从 podconfig 中获取的 那就来看看 podconfig 是怎么工作的- // syncLoop is the main loop for processing changes. It watches for changes from
- // three channels (file, apiserver, and http) and creates a union of them. For
- // any new change seen, will run a sync against desired state and running state. If
- // no changes are seen to the configuration, will synchronize the last known desired
- // state every sync-frequency seconds. Never returns.
- func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
- klog.InfoS("Starting kubelet main sync loop")
- // The syncTicker wakes up kubelet to checks if there are any pod workers
- // that need to be sync'd. A one-second period is sufficient because the
- // sync interval is defaulted to 10s.
- syncTicker := time.NewTicker(time.Second)
- defer syncTicker.Stop()
- housekeepingTicker := time.NewTicker(housekeepingPeriod)
- defer housekeepingTicker.Stop()
- plegCh := kl.pleg.Watch()
- const (
- base = 100 * time.Millisecond
- max = 5 * time.Second
- factor = 2
- )
- duration := base
- // Responsible for checking limits in resolv.conf
- // The limits do not have anything to do with individual pods
- // Since this is called in syncLoop, we don't need to call it anywhere else
- if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
- kl.dnsConfigurer.CheckLimitsForResolvConf()
- }
- for {
- if err := kl.runtimeState.runtimeErrors(); err != nil {
- klog.ErrorS(err, "Skipping pod synchronization")
- // exponential backoff
- time.Sleep(duration)
- duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
- continue
- }
- // reset backoff if we have a success
- duration = base
- kl.syncLoopMonitor.Store(kl.clock.Now())
- if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
- break
- }
- kl.syncLoopMonitor.Store(kl.clock.Now())
- }
- }
复制代码 这里定义了接口 接口提中可以存储多个 source 那么有什么 source 呢
kube-apiserver
[code]func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan |