ToB企服应用市场:ToB评测及商务社交产业平台

标题: kubelet 原理分析 [打印本页]

作者: 农民    时间: 2023-12-9 13:59
标题: kubelet 原理分析
kubelet 简介

kubernetes 分为控制面和数据面,kubelet 就是数据面最主要的组件,在每个节点上启动,主要负责容器的创建、启停、监控、日志收集等工作。它是一个在每个集群节点上运行的代理,负责确保节点上的容器根据PodSpec(Pod定义文件)正确运行。
Kubelet执行以下几项重要功能:
kubelet 架构


kubelet 的架构由 N 多的组件组成,下面简单介绍下比较重要的几个:
流程

首先在 cmd/kubelet 中使用传入命令行参数的方式初始化配置,然后创建 pkg/kubelet 中的 Bootstrap inferface, kubelet struct 实现了这个接口, 然后调用 Run 方法启动 kubelet。
  1. func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
  2.         // start the kubelet
  3.         go k.Run(podCfg.Updates())
  4.         // start the kubelet server
  5.         if enableServer {
  6.                 go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
  7.         }
  8.         if kubeCfg.ReadOnlyPort > 0 {
  9.                 go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
  10.         }
  11.         go k.ListenAndServePodResources()
  12. }
复制代码
Bootstrap
  1. // Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol
  2. type Bootstrap interface {
  3.         GetConfiguration() kubeletconfiginternal.KubeletConfiguration
  4.         BirthCry()
  5.         StartGarbageCollection()
  6.         ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider)
  7.         ListenAndServeReadOnly(address net.IP, port uint)
  8.         ListenAndServePodResources()
  9.         Run(<-chan kubetypes.PodUpdate)
  10.         RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
  11. }
  12. type Kubelet struct {
  13.     // ...
  14. }
复制代码
首先解释一下这个函数的参数:
代码流程为:
  1. func (kl *Kubelet) StartGarbageCollection() {
  2.         loggedContainerGCFailure := false
  3.         go wait.Until(func() {
  4.                 ctx := context.Background()
  5.                 if err := kl.containerGC.GarbageCollect(ctx); err != nil {
  6.                         klog.ErrorS(err, "Container garbage collection failed")
  7.                         kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
  8.                         loggedContainerGCFailure = true
  9.                 } else {
  10.                         var vLevel klog.Level = 4
  11.                         if loggedContainerGCFailure {
  12.                                 vLevel = 1
  13.                                 loggedContainerGCFailure = false
  14.                         }
  15.                         klog.V(vLevel).InfoS("Container garbage collection succeeded")
  16.                 }
  17.         }, ContainerGCPeriod, wait.NeverStop)
  18.         // when the high threshold is set to 100, stub the image GC manager
  19.         if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
  20.                 klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100, Disable image GC")
  21.                 return
  22.         }
  23.         prevImageGCFailed := false
  24.         go wait.Until(func() {
  25.                 ctx := context.Background()
  26.                 if err := kl.imageManager.GarbageCollect(ctx); err != nil {
  27.                         if prevImageGCFailed {
  28.                                 klog.ErrorS(err, "Image garbage collection failed multiple times in a row")
  29.                                 // Only create an event for repeated failures
  30.                                 kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
  31.                         } else {
  32.                                 klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
  33.                         }
  34.                         prevImageGCFailed = true
  35.                 } else {
  36.                         var vLevel klog.Level = 4
  37.                         if prevImageGCFailed {
  38.                                 vLevel = 1
  39.                                 prevImageGCFailed = false
  40.                         }
  41.                         klog.V(vLevel).InfoS("Image garbage collection succeeded")
  42.                 }
  43.         }, ImageGCPeriod, wait.NeverStop)
  44. }
复制代码
handleProbeSync也是使用 handler 的 HandlePodSyncs 做同步
handle (SyncHandler)
  1. // RunOnce polls from one configuration update and run the associated pods.
  2. func (kl *Kubelet) RunOnce(updates <-chan kubetypes.PodUpdate) ([]RunPodResult, error) {
  3.         ctx := context.Background()
  4.         // Setup filesystem directories.
  5.         if err := kl.setupDataDirs(); err != nil {
  6.                 return nil, err
  7.         }
  8.         // If the container logs directory does not exist, create it.
  9.         if _, err := os.Stat(ContainerLogsDir); err != nil {
  10.                 if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
  11.                         klog.ErrorS(err, "Failed to create directory", "path", ContainerLogsDir)
  12.                 }
  13.         }
  14.         select {
  15.         case u := <-updates:
  16.                 klog.InfoS("Processing manifest with pods", "numPods", len(u.Pods))
  17.                 result, err := kl.runOnce(ctx, u.Pods, runOnceRetryDelay)
  18.                 klog.InfoS("Finished processing pods", "numPods", len(u.Pods))
  19.                 return result, err
  20.         case <-time.After(runOnceManifestDelay):
  21.                 return nil, fmt.Errorf("no pod manifest update after %v", runOnceManifestDelay)
  22.         }
  23. }
  24. // runOnce runs a given set of pods and returns their status.
  25. func (kl *Kubelet) runOnce(ctx context.Context, pods []*v1.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
  26.         ch := make(chan RunPodResult)
  27.         admitted := []*v1.Pod{}
  28.         for _, pod := range pods {
  29.                 // Check if we can admit the pod.
  30.                 if ok, reason, message := kl.canAdmitPod(admitted, pod); !ok {
  31.                         kl.rejectPod(pod, reason, message)
  32.                         results = append(results, RunPodResult{pod, nil})
  33.                         continue
  34.                 }
  35.                 admitted = append(admitted, pod)
  36.                 go func(pod *v1.Pod) {
  37.                         err := kl.runPod(ctx, pod, retryDelay)
  38.                         ch <- RunPodResult{pod, err}
  39.                 }(pod)
  40.         }
  41.         klog.InfoS("Waiting for pods", "numPods", len(admitted))
  42.         failedPods := []string{}
  43.         for i := 0; i < len(admitted); i++ {
  44.                 res := <-ch
  45.                 results = append(results, res)
  46.                 if res.Err != nil {
  47.                         failedContainerName, err := kl.getFailedContainers(ctx, res.Pod)
  48.                         if err != nil {
  49.                                 klog.InfoS("Unable to get failed containers' names for pod", "pod", klog.KObj(res.Pod), "err", err)
  50.                         } else {
  51.                                 klog.InfoS("Unable to start pod because container failed", "pod", klog.KObj(res.Pod), "containerName", failedContainerName)
  52.                         }
  53.                         failedPods = append(failedPods, format.Pod(res.Pod))
  54.                 } else {
  55.                         klog.InfoS("Started pod", "pod", klog.KObj(res.Pod))
  56.                 }
  57.         }
  58.         if len(failedPods) > 0 {
  59.                 return results, fmt.Errorf("error running pods: %v", failedPods)
  60.         }
  61.         klog.InfoS("Pods started", "numPods", len(pods))
  62.         return results, err
  63. }
复制代码
也是 kubelet struct 实现了这个接口
  1. func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
  2.         ctx := context.Background()
  3.         if kl.logServer == nil {
  4.                 file := http.FileServer(http.Dir(nodeLogDir))
  5.                 if utilfeature.DefaultFeatureGate.Enabled(features.NodeLogQuery) && kl.kubeletConfiguration.EnableSystemLogQuery {
  6.                         kl.logServer = http.StripPrefix("/logs/", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  7.                                 if nlq, errs := newNodeLogQuery(req.URL.Query()); len(errs) > 0 {
  8.                                         http.Error(w, errs.ToAggregate().Error(), http.StatusBadRequest)
  9.                                         return
  10.                                 } else if nlq != nil {
  11.                                         if req.URL.Path != "/" && req.URL.Path != "" {
  12.                                                 http.Error(w, "path not allowed in query mode", http.StatusNotAcceptable)
  13.                                                 return
  14.                                         }
  15.                                         if errs := nlq.validate(); len(errs) > 0 {
  16.                                                 http.Error(w, errs.ToAggregate().Error(), http.StatusNotAcceptable)
  17.                                                 return
  18.                                         }
  19.                                         // Validation ensures that the request does not query services and files at the same time
  20.                                         if len(nlq.Services) > 0 {
  21.                                                 journal.ServeHTTP(w, req)
  22.                                                 return
  23.                                         }
  24.                                         // Validation ensures that the request does not explicitly query multiple files at the same time
  25.                                         if len(nlq.Files) == 1 {
  26.                                                 // Account for the \ being used on Windows clients
  27.                                                 req.URL.Path = filepath.ToSlash(nlq.Files[0])
  28.                                         }
  29.                                 }
  30.                                 // Fall back in case the caller is directly trying to query a file
  31.                                 // Example: kubectl get --raw /api/v1/nodes/$name/proxy/logs/foo.log
  32.                                 file.ServeHTTP(w, req)
  33.                         }))
  34.                 } else {
  35.                         kl.logServer = http.StripPrefix("/logs/", file)
  36.                 }
  37.         }
  38.         if kl.kubeClient == nil {
  39.                 klog.InfoS("No API server defined - no node status update will be sent")
  40.         }
  41.         // Start the cloud provider sync manager
  42.         if kl.cloudResourceSyncManager != nil {
  43.                 go kl.cloudResourceSyncManager.Run(wait.NeverStop)
  44.         }
  45.         if err := kl.initializeModules(); err != nil {
  46.                 kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
  47.                 klog.ErrorS(err, "Failed to initialize internal modules")
  48.                 os.Exit(1)
  49.         }
  50.         // Start volume manager
  51.         go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
  52.         if kl.kubeClient != nil {
  53.                 // Start two go-routines to update the status.
  54.                 //
  55.                 // The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
  56.                 // while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
  57.                 // once the node becomes ready, then exits afterwards.
  58.                 //
  59.                 // Introduce some small jittering to ensure that over time the requests won't start
  60.                 // accumulating at approximately the same time from the set of nodes due to priority and
  61.                 // fairness effect.
  62.                 go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
  63.                 go kl.fastStatusUpdateOnce()
  64.                 // start syncing lease
  65.                 go kl.nodeLeaseController.Run(context.Background())
  66.         }
  67.         go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
  68.         // Set up iptables util rules
  69.         if kl.makeIPTablesUtilChains {
  70.                 kl.initNetworkUtil()
  71.         }
  72.         // Start component sync loops.
  73.         kl.statusManager.Start()
  74.         // Start syncing RuntimeClasses if enabled.
  75.         if kl.runtimeClassManager != nil {
  76.                 kl.runtimeClassManager.Start(wait.NeverStop)
  77.         }
  78.         // Start the pod lifecycle event generator.
  79.         kl.pleg.Start()
  80.         // Start eventedPLEG only if EventedPLEG feature gate is enabled.
  81.         if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
  82.                 kl.eventedPleg.Start()
  83.         }
  84.         kl.syncLoop(ctx, updates, kl)
  85. }
复制代码
可以看到这些函数基本都是把pod交给 podWorkers 去处理
podconfig

上文中的 updates channel 是从 podconfig 中获取的 那就来看看 podconfig 是怎么工作的
  1. // syncLoop is the main loop for processing changes. It watches for changes from
  2. // three channels (file, apiserver, and http) and creates a union of them. For
  3. // any new change seen, will run a sync against desired state and running state. If
  4. // no changes are seen to the configuration, will synchronize the last known desired
  5. // state every sync-frequency seconds. Never returns.
  6. func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  7.         klog.InfoS("Starting kubelet main sync loop")
  8.         // The syncTicker wakes up kubelet to checks if there are any pod workers
  9.         // that need to be sync'd. A one-second period is sufficient because the
  10.         // sync interval is defaulted to 10s.
  11.         syncTicker := time.NewTicker(time.Second)
  12.         defer syncTicker.Stop()
  13.         housekeepingTicker := time.NewTicker(housekeepingPeriod)
  14.         defer housekeepingTicker.Stop()
  15.         plegCh := kl.pleg.Watch()
  16.         const (
  17.                 base   = 100 * time.Millisecond
  18.                 max    = 5 * time.Second
  19.                 factor = 2
  20.         )
  21.         duration := base
  22.         // Responsible for checking limits in resolv.conf
  23.         // The limits do not have anything to do with individual pods
  24.         // Since this is called in syncLoop, we don't need to call it anywhere else
  25.         if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
  26.                 kl.dnsConfigurer.CheckLimitsForResolvConf()
  27.         }
  28.         for {
  29.                 if err := kl.runtimeState.runtimeErrors(); err != nil {
  30.                         klog.ErrorS(err, "Skipping pod synchronization")
  31.                         // exponential backoff
  32.                         time.Sleep(duration)
  33.                         duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
  34.                         continue
  35.                 }
  36.                 // reset backoff if we have a success
  37.                 duration = base
  38.                 kl.syncLoopMonitor.Store(kl.clock.Now())
  39.                 if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
  40.                         break
  41.                 }
  42.                 kl.syncLoopMonitor.Store(kl.clock.Now())
  43.         }
  44. }
复制代码
这里定义了接口 接口提中可以存储多个 source 那么有什么 source 呢
kube-apiserver

[code]func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4