kubelet 原理分析
kubelet 简介kubernetes 分为控制面和数据面,kubelet 就是数据面最主要的组件,在每个节点上启动,主要负责容器的创建、启停、监控、日志收集等工作。它是一个在每个集群节点上运行的代理,负责确保节点上的容器根据PodSpec(Pod定义文件)正确运行。
Kubelet执行以下几项重要功能:
[*]Pod生命周期管理:Kubelet根据从API服务器接收到的PodSpecs创建、启动、终止容器。它负责启动Pod中的容器,并确保它们按预期运行。
[*]节点状态监控:Kubelet定期监控节点和容器的状态,并将状态报告回集群的控制平面。这使得集群中的其他组件能够做出相应的调度决策。
[*]资源管理:Kubelet负责管理分配给每个Pod的资源。这包括CPU、内存和磁盘存储资源。
[*]健康检查:Kubelet可以执行容器健康检查,并根据检查结果决定是否需要重启容器。
[*]与容器运行时的通信:Kubelet与容器运行时(如Docker、containerd等)通信,以管理容器的生命周期。
[*]秘密和配置管理:Kubelet负责将秘密、配置映射等挂载到Pod的容器中,以便应用程序可以访问这些配置。
[*]服务发现和负载均衡:尽管Kubelet本身不直接处理服务发现,但它通过设置网络规则和环境变量来支持容器内的服务发现机制。
kubelet 架构
https://img2023.cnblogs.com/blog/2344773/202311/2344773-20231107221805894-125360121.png
kubelet 的架构由 N 多的组件组成,下面简单介绍下比较重要的几个:
[*]Sync Loop: 这是Kubelet活动的核心,负责同步Pod的状态。同步循环会定期从API服务器获取PodSpecs,并确保容器的当前状态与这些规格相匹配。
[*]PodConfig: 负责将各个配置源转换成 PodSpecs,可以选择的配置源包括:Kube-apiserver、本地文件、HTTP。
[*]PLEG(Pod Lifecycle Event Generator): 负责监测和缓存Pod生命周期事件,如创建、启动或停止容器,然后将这些事件通知 Sync Loop。
[*]PodWorkers: 负责管理 Pod 的生命周期事件处理。当 Pod 生命周期事件 PLEG 检测到新的事件时,PodWorkers 会被调用来处理这些事件,包括启动新的 Pod、更新现有的 Pod、或者停止和清理不再需要的 Pod。
[*]PodManager: 存储 Pod 的期望状态,kubelet 服务的不同渠道的 Pod。
[*]ContainerRuntime: 顾名思义,容器运行时。与遵循 CRI 规范的高级容器运行时进行交互。
[*]StatsProvider: 提供节点和容器的统计信息,有 cAdvisor 和 CRI 两种实现。
[*]ProbeManager: 负责执行容器的健康检查,包括 Liveness,Startup 和 Readiness 检查。
[*]VolumeManager: 负责管理 Pod 的卷,包括挂载和卸载卷。
[*]ImageManager: 负责管理镜像,包括拉取、删除、镜像 GC 等。
[*]DeviceManager: 负责管理设备,包括 GPU、RDMA 等。
[*]PluginManager: PluginManager 运行一组异步循环,根据此节点确定哪些插件需要注册/取消注册并执行。如 CSI 驱动和设备管理器插件(Device Plugin)。
[*]CertificateManager: 处理证书轮换。
[*]OOMWatcher: 从系统日志中获取容器的 OOM 日志,将其封装成事件并记录。
流程
首先在 cmd/kubelet 中使用传入命令行参数的方式初始化配置,然后创建 pkg/kubelet 中的 Bootstrap inferface, kubelet struct 实现了这个接口, 然后调用 Run 方法启动 kubelet。
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())
// start the kubelet server
if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
go k.ListenAndServePodResources()
}Bootstrap
// Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol
type Bootstrap interface {
GetConfiguration() kubeletconfiginternal.KubeletConfiguration
BirthCry()
StartGarbageCollection()
ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider)
ListenAndServeReadOnly(address net.IP, port uint)
ListenAndServePodResources()
Run(<-chan kubetypes.PodUpdate)
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}
type Kubelet struct {
// ...
}首先解释一下这个函数的参数:
[*]configCh: 将配置更改的 Pod 分派给适当的处理程序回调函数
[*]plegCh: 更新运行时缓存;同步 Pod
[*]syncCh: 同步所有等待同步的 Pod
[*]housekeepingCh: 触发 Pod 的清理
[*]health manager: 同步失败的 Pod 或其中一个或多个容器的健康检查失败的 Pod
代码流程为:
[*]如果 updates channel 有消息 则使用 handler 调用对应方法做处理
[*]如果 plegCh 有消息 则使用 handler 的 HandlePodSyncs 做同步
[*]如果 syncCh 有消息 代表到了同步时间 做同步操作
[*]如果是 三种 probe 的更新 则使用 handleProbeSync 做同步
[*]如果 housekeepingCh 有消息 则使用 handler 的 HandlePodCleanups 做清理
func (kl *Kubelet) StartGarbageCollection() {
loggedContainerGCFailure := false
go wait.Until(func() {
ctx := context.Background()
if err := kl.containerGC.GarbageCollect(ctx); err != nil {
klog.ErrorS(err, "Container garbage collection failed")
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
loggedContainerGCFailure = true
} else {
var vLevel klog.Level = 4
if loggedContainerGCFailure {
vLevel = 1
loggedContainerGCFailure = false
}
klog.V(vLevel).InfoS("Container garbage collection succeeded")
}
}, ContainerGCPeriod, wait.NeverStop)
// when the high threshold is set to 100, stub the image GC manager
if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100, Disable image GC")
return
}
prevImageGCFailed := false
go wait.Until(func() {
ctx := context.Background()
if err := kl.imageManager.GarbageCollect(ctx); err != nil {
if prevImageGCFailed {
klog.ErrorS(err, "Image garbage collection failed multiple times in a row")
// Only create an event for repeated failures
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
} else {
klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
}
prevImageGCFailed = true
} else {
var vLevel klog.Level = 4
if prevImageGCFailed {
vLevel = 1
prevImageGCFailed = false
}
klog.V(vLevel).InfoS("Image garbage collection succeeded")
}
}, ImageGCPeriod, wait.NeverStop)
}handleProbeSync也是使用 handler 的 HandlePodSyncs 做同步
handle (SyncHandler)
// RunOnce polls from one configuration update and run the associated pods.
func (kl *Kubelet) RunOnce(updates <-chan kubetypes.PodUpdate) ([]RunPodResult, error) {
ctx := context.Background()
// Setup filesystem directories.
if err := kl.setupDataDirs(); err != nil {
return nil, err
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(ContainerLogsDir); err != nil {
if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
klog.ErrorS(err, "Failed to create directory", "path", ContainerLogsDir)
}
}
select {
case u := <-updates:
klog.InfoS("Processing manifest with pods", "numPods", len(u.Pods))
result, err := kl.runOnce(ctx, u.Pods, runOnceRetryDelay)
klog.InfoS("Finished processing pods", "numPods", len(u.Pods))
return result, err
case <-time.After(runOnceManifestDelay):
return nil, fmt.Errorf("no pod manifest update after %v", runOnceManifestDelay)
}
}
// runOnce runs a given set of pods and returns their status.
func (kl *Kubelet) runOnce(ctx context.Context, pods []*v1.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
ch := make(chan RunPodResult)
admitted := []*v1.Pod{}
for _, pod := range pods {
// Check if we can admit the pod.
if ok, reason, message := kl.canAdmitPod(admitted, pod); !ok {
kl.rejectPod(pod, reason, message)
results = append(results, RunPodResult{pod, nil})
continue
}
admitted = append(admitted, pod)
go func(pod *v1.Pod) {
err := kl.runPod(ctx, pod, retryDelay)
ch <- RunPodResult{pod, err}
}(pod)
}
klog.InfoS("Waiting for pods", "numPods", len(admitted))
failedPods := []string{}
for i := 0; i < len(admitted); i++ {
res := <-ch
results = append(results, res)
if res.Err != nil {
failedContainerName, err := kl.getFailedContainers(ctx, res.Pod)
if err != nil {
klog.InfoS("Unable to get failed containers' names for pod", "pod", klog.KObj(res.Pod), "err", err)
} else {
klog.InfoS("Unable to start pod because container failed", "pod", klog.KObj(res.Pod), "containerName", failedContainerName)
}
failedPods = append(failedPods, format.Pod(res.Pod))
} else {
klog.InfoS("Started pod", "pod", klog.KObj(res.Pod))
}
}
if len(failedPods) > 0 {
return results, fmt.Errorf("error running pods: %v", failedPods)
}
klog.InfoS("Pods started", "numPods", len(pods))
return results, err
}也是 kubelet struct 实现了这个接口
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
ctx := context.Background()
if kl.logServer == nil {
file := http.FileServer(http.Dir(nodeLogDir))
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLogQuery) && kl.kubeletConfiguration.EnableSystemLogQuery {
kl.logServer = http.StripPrefix("/logs/", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if nlq, errs := newNodeLogQuery(req.URL.Query()); len(errs) > 0 {
http.Error(w, errs.ToAggregate().Error(), http.StatusBadRequest)
return
} else if nlq != nil {
if req.URL.Path != "/" && req.URL.Path != "" {
http.Error(w, "path not allowed in query mode", http.StatusNotAcceptable)
return
}
if errs := nlq.validate(); len(errs) > 0 {
http.Error(w, errs.ToAggregate().Error(), http.StatusNotAcceptable)
return
}
// Validation ensures that the request does not query services and files at the same time
if len(nlq.Services) > 0 {
journal.ServeHTTP(w, req)
return
}
// Validation ensures that the request does not explicitly query multiple files at the same time
if len(nlq.Files) == 1 {
// Account for the \ being used on Windows clients
req.URL.Path = filepath.ToSlash(nlq.Files)
}
}
// Fall back in case the caller is directly trying to query a file
// Example: kubectl get --raw /api/v1/nodes/$name/proxy/logs/foo.log
file.ServeHTTP(w, req)
}))
} else {
kl.logServer = http.StripPrefix("/logs/", file)
}
}
if kl.kubeClient == nil {
klog.InfoS("No API server defined - no node status update will be sent")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules")
os.Exit(1)
}
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start two go-routines to update the status.
//
// The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
// while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
// once the node becomes ready, then exits afterwards.
//
// Introduce some small jittering to ensure that over time the requests won't start
// accumulating at approximately the same time from the set of nodes due to priority and
// fairness effect.
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
go kl.nodeLeaseController.Run(context.Background())
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Set up iptables util rules
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// Start component sync loops.
kl.statusManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// Start the pod lifecycle event generator.
kl.pleg.Start()
// Start eventedPLEG only if EventedPLEG feature gate is enabled.
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
kl.eventedPleg.Start()
}
kl.syncLoop(ctx, updates, kl)
}可以看到这些函数基本都是把pod交给 podWorkers 去处理
podconfig
上文中的 updates channel 是从 podconfig 中获取的 那就来看看 podconfig 是怎么工作的
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
// Responsible for checking limits in resolv.conf
// The limits do not have anything to do with individual pods
// Since this is called in syncLoop, we don't need to call it anywhere else
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}这里定义了接口 接口提中可以存储多个 source 那么有什么 source 呢
kube-apiserver
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan
页:
[1]