原味地址
https://haiyux.cc/2023/02/26/k8s-client-go/
client-go是什么?
client-go是Kubernetes官方提供的Go语言客户端库,用于与Kubernetes API服务器交互。使用client-go,您可以编写Go语言程序来创建、修改和删除Kubernetes对象,如Pod、Deployment、Service等。
作用
client-go的主要功能包括:
- 连接Kubernetes API服务器:client-go提供了一个API客户端,用于连接Kubernetes API服务器。
- 对象管理:client-go提供了一组API,用于创建、读取、更新和删除Kubernetes对象,如Pod、Deployment、Service等。
- Watch API:client-go提供了一个Watch API,可以用于监视Kubernetes对象的变化。
- 命名空间支持:client-go支持多个命名空间,并提供了一组API,用于管理命名空间。
- 认证和授权:client-go提供了一组API,用于执行身份验证和授权,以确保只有授权的用户才能对Kubernetes对象进行操作。
client-go是使用Kubernetes API的标准方式,是Kubernetes生态系统中的重要组成部分。
api client
client-go 中包含四种client,RestClient, ClientSet,DynamicClient和DiscoveryClient。

ClientSet,DynamicClient,DiscoveryClient都是RestClient上的封装
RestClient
RestClient是最基础的客户端,它基于HTTP请求进行了封装,实现了RESTful API。使用RESTClient提供的RESTful方法,如Get()、Put()、Post()和Delete(),可以直接与API进行交互。同时,它支持JSON和Protocol Buffers,并支持所有原生资源和自定义资源定义(CRDs)。然而,为了更加优雅地处理API交互,一般需要进一步封装,通过Clientset对RESTClient进行封装,然后再对外提供接口和服务。- package main
- import (
- "context"
- "fmt"
- "path/filepath"
- corev1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/kubernetes/scheme"
- "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/clientcmd"
- "k8s.io/client-go/util/homedir"
- )
- func main() {
- // 使用kubeconfig生成配置
- config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
- if err != nil {
- panic(err)
- }
- config.APIPath = "api"
- config.GroupVersion = &corev1.SchemeGroupVersion
- config.NegotiatedSerializer = scheme.Codecs
- // 生成restClient
- restClient, err := rest.RESTClientFor(config)
- if err != nil {
- panic(err)
- }
- rest := &corev1.PodList{}
- if err = restClient.Get().Namespace("default").Resource("pods").VersionedParams(&metav1.ListOptions{},
- scheme.ParameterCodec).Do(context.TODO()).Into(rest); err != nil {
- panic(err)
- }
- for _, v := range rest.Items {
- fmt.Printf("NameSpace: %v Name: %v Status: %v \n", v.Namespace, v.Name, v.Status.Phase)
- }
- }
- /*
- 结果
- NameSpace: default Name: nginx-76d6c9b8c-8ljkt Status: Running
- NameSpace: default Name: nginx-76d6c9b8c-jqv9h Status: Running
- NameSpace: default Name: nginx-76d6c9b8c-kr9d2 Status: Running
- NameSpace: default Name: nginx-76d6c9b8c-m4g5l Status: Running
- NameSpace: default Name: nginx-76d6c9b8c-n8st9 Status: Running
- */
复制代码 ClientSet
ClientSet是在RestClient的基础上封装了对资源和版本的管理方法。资源可以理解为一个客户端,而ClientSet是多个客户端的集合。在操作资源对象时,需要指定Group和Version,然后根据资源获取。然而,ClientSet不支持自定义资源定义(CRDs),但使用kubebuilder生成代码时,会生成相应的ClientSet。- package main
- import (
- "context"
- "fmt"
- "path/filepath"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/clientcmd"
- "k8s.io/client-go/util/homedir"
- )
- func main() {
- ctx := context.Background()
- // 使用kubeconfig生成配置 ~/.kube/config
- config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
- if err != nil {
- panic(err)
- }
- // 生成clientSet
- clientSet, err := kubernetes.NewForConfig(config)
- if err != nil {
- panic(err)
- }
- nodeList, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
- if err != nil {
- panic(err)
- }
- for _, node := range nodeList.Items {
- fmt.Printf("nodeName: %v, status: %v \n", node.GetName(), node.GetCreationTimestamp())
- }
- // pod 是有namespace资源所以指定namespace 而node没有
- pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{})
- if err != nil {
- panic(err)
- }
- for _, v := range pods.Items {
- fmt.Printf("namespace: %v podName: %v status: %v \n", v.Namespace, v.Name, v.Status.Phase)
- }
- }
- /*
- 结果:
- nodeName: minikube, status: 2023-01-27 18:45:35 +0800 CST
- nodeName: minikube-m02, status: 2023-02-26 21:19:30 +0800 CST
- nodeName: minikube-m03, status: 2023-02-26 21:19:38 +0800 CST
- namespace: default podName: nginx-76d6c9b8c-8ljkt status: Running
- namespace: default podName: nginx-76d6c9b8c-jqv9h status: Running
- namespace: default podName: nginx-76d6c9b8c-kr9d2 status: Running
- namespace: default podName: nginx-76d6c9b8c-m4g5l status: Running
- namespace: default podName: nginx-76d6c9b8c-n8st9 status: Running
- */
复制代码 DynamicClient
DynamicClient是一种动态客户端,它可以对任何资源进行RESTful操作,包括自定义资源定义(CRD)。与ClientSet不同,DynamicClient返回的对象是一个map[string]interface{}。如果一个控制器需要控制所有的API,可以使用DynamicClient。目前,DynamicClient在垃圾回收器和命名空间控制器中被广泛使用。
DynamicClient的处理过程将Resource(例如PodList)转换为unstructured类型。Kubernetes的所有资源都可以转换为这个结构类型。处理完毕后,再将其转换回PodList。整个转换过程类似于接口转换,即通过interface{}的断言实现。
DynamicClient是一种动态的客户端,它能处理Kubernetes所有的资源,但仅支持JSON- package main
- import (
- "context"
- "fmt"
- "path/filepath"
- apiv1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/client-go/dynamic"
- "k8s.io/client-go/tools/clientcmd"
- "k8s.io/client-go/util/homedir"
- )
- func main() {
- ctx := context.Background()
- // 使用kubeconfig生成配置 ~/.kube/config
- config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
- if err != nil {
- panic(err)
- }
- // dynamicClient
- dynamicClient, err := dynamic.NewForConfig(config)
- if err != nil {
- panic(err)
- }
- // 定义组版本资源
- gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
- unStructObj, err := dynamicClient.Resource(gvr).Namespace("default").List(ctx, metav1.ListOptions{})
- if err != nil {
- panic(err)
- }
- podList := &apiv1.PodList{}
- if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unStructObj.UnstructuredContent(), podList); err != nil {
- panic(err)
- }
- for _, v := range podList.Items {
- fmt.Printf("namespaces:%v podName:%v status:%v \n", v.Namespace, v.Name, v.Status.Phase)
- }
- }
- /*
- namespaces:default podName:nginx-76d6c9b8c-8ljkt status:Running
- namespaces:default podName:nginx-76d6c9b8c-jqv9h status:Running
- namespaces:default podName:nginx-76d6c9b8c-kr9d2 status:Running
- namespaces:default podName:nginx-76d6c9b8c-m4g5l status:Running
- namespaces:default podName:nginx-76d6c9b8c-n8st9 status:Running
- */
复制代码 其中,GVR(group,version,resource) 用于标识 Kubernetes API 中的资源类型,其中 Group 表示 API 群组,Version 表示 API 版本,Resource 表示资源类型。例如,Deployment 的 GVR 为 "apps/v1/deployments",其中 "apps" 是 API 群组,"v1" 是 API 版本,"deployments" 是资源类型。
DiscoveryClient
DiscoveryClient 是一个发现客户端,它的主要作用是用于发现 API Server 支持的资源组、资源版本和资源信息。在 Kubernetes 中,API Server 支持很多资源组、资源版本和资源信息,我们可以通过使用 DiscoveryClient 来查看这些信息。此外,kubectl 的 API 版本和 API 资源也是通过 DiscoveryClient 来实现的。我们还可以将这些信息缓存到本地,以减轻 API 访问的压力。缓存文件默认存储在 ./kube/cache 和 ./kube/http-cache 目录下。- package main
- import (
- "fmt"
- "path/filepath"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/client-go/discovery"
- "k8s.io/client-go/tools/clientcmd"
- "k8s.io/client-go/util/homedir"
- )
- func main() {
- // 使用kubeconfig生成配置 ~/.kube/config
- config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
- if err != nil {
- panic(err)
- }
- // 生成discoverClient
- discoverClient, err := discovery.NewDiscoveryClientForConfig(config)
- if err != nil {
- panic(err)
- }
- _, apiResourceList, err := discoverClient.ServerGroupsAndResources()
- for _, v := range apiResourceList {
- gv, err := schema.ParseGroupVersion(v.GroupVersion)
- if err != nil {
- panic(err)
- }
- for _, resource := range v.APIResources {
- fmt.Printf("name:%v group:%v version:%v\n", resource.Name, gv.Group, gv.Version)
- }
- }
- }
- /*
- name:bindings group: version:v1
- name:componentstatuses group: version:v1
- name:configmaps group: version:v1
- name:endpoints group: version:v1
- name:events group: version:v1
- name:limitranges group: version:v1
- name:namespaces group: version:v1
- name:namespaces/finalize group: version:v1
- name:namespaces/status group: version:v1
- name:nodes group: version:v1
- name:nodes/proxy group: version:v1
- name:nodes/status group: version:v1
- name:persistentvolumeclaims group: version:v1
- name:persistentvolumeclaims/status group: version:v1
- name:persistentvolumes group: version:v1
- name:persistentvolumes/status group: version:v1
- name:pods group: version:v1
- name:pods/attach group: version:v1
- name:pods/binding group: version:v1
- name:pods/ephemeralcontainers group: version:v1
- name:pods/eviction group: version:v1
- name:pods/exec group: version:v1
- name:pods/log group: version:v1
- name:pods/portforward group: version:v1
- name:pods/proxy group: version:v1
- name:pods/status group: version:v1
- name:podtemplates group: version:v1
- name:replicationcontrollers group: version:v1
- name:replicationcontrollers/scale group: version:v1
- name:replicationcontrollers/status group: version:v1
- name:resourcequotas group: version:v1
- name:resourcequotas/status group: version:v1
- name:secrets group: version:v1
- name:serviceaccounts group: version:v1
- name:serviceaccounts/token group: version:v1
- name:services group: version:v1
- name:services/proxy group: version:v1
- name:services/status group: version:v1
- name:apiservices group:apiregistration.k8s.io version:v1
- name:apiservices/status group:apiregistration.k8s.io version:v1
- name:controllerrevisions group:apps version:v1
- name:daemonsets group:apps version:v1
- name:daemonsets/status group:apps version:v1
- name:deployments group:apps version:v1
- name:deployments/scale group:apps version:v1
- name:deployments/status group:apps version:v1
- name:replicasets group:apps version:v1
- name:replicasets/scale group:apps version:v1
- name:replicasets/status group:apps version:v1
- name:statefulsets group:apps version:v1
- name:statefulsets/scale group:apps version:v1
- name:statefulsets/status group:apps version:v1
- name:events group:events.k8s.io version:v1
- name:tokenreviews group:authentication.k8s.io version:v1
- name:localsubjectaccessreviews group:authorization.k8s.io version:v1
- name:selfsubjectaccessreviews group:authorization.k8s.io version:v1
- name:selfsubjectrulesreviews group:authorization.k8s.io version:v1
- name:subjectaccessreviews group:authorization.k8s.io version:v1
- name:horizontalpodautoscalers group:autoscaling version:v2
- name:horizontalpodautoscalers/status group:autoscaling version:v2
- name:horizontalpodautoscalers group:autoscaling version:v1
- name:horizontalpodautoscalers/status group:autoscaling version:v1
- name:horizontalpodautoscalers group:autoscaling version:v2beta2
- name:horizontalpodautoscalers/status group:autoscaling version:v2beta2
- name:cronjobs group:batch version:v1
- name:cronjobs/status group:batch version:v1
- name:jobs group:batch version:v1
- name:jobs/status group:batch version:v1
- name:certificatesigningrequests group:certificates.k8s.io version:v1
- name:certificatesigningrequests/approval group:certificates.k8s.io version:v1
- name:certificatesigningrequests/status group:certificates.k8s.io version:v1
- name:ingressclasses group:networking.k8s.io version:v1
- name:ingresses group:networking.k8s.io version:v1
- name:ingresses/status group:networking.k8s.io version:v1
- name:networkpolicies group:networking.k8s.io version:v1
- name:networkpolicies/status group:networking.k8s.io version:v1
- name:poddisruptionbudgets group:policy version:v1
- name:poddisruptionbudgets/status group:policy version:v1
- name:clusterrolebindings group:rbac.authorization.k8s.io version:v1
- name:clusterroles group:rbac.authorization.k8s.io version:v1
- name:rolebindings group:rbac.authorization.k8s.io version:v1
- name:roles group:rbac.authorization.k8s.io version:v1
- name:csidrivers group:storage.k8s.io version:v1
- name:csinodes group:storage.k8s.io version:v1
- name:csistoragecapacities group:storage.k8s.io version:v1
- name:storageclasses group:storage.k8s.io version:v1
- name:volumeattachments group:storage.k8s.io version:v1
- name:volumeattachments/status group:storage.k8s.io version:v1
- name:csistoragecapacities group:storage.k8s.io version:v1beta1
- name:mutatingwebhookconfigurations group:admissionregistration.k8s.io version:v1
- name:validatingwebhookconfigurations group:admissionregistration.k8s.io version:v1
- name:customresourcedefinitions group:apiextensions.k8s.io version:v1
- name:customresourcedefinitions/status group:apiextensions.k8s.io version:v1
- name:priorityclasses group:scheduling.k8s.io version:v1
- name:leases group:coordination.k8s.io version:v1
- name:runtimeclasses group:node.k8s.io version:v1
- name:endpointslices group:discovery.k8s.io version:v1
- name:flowschemas group:flowcontrol.apiserver.k8s.io version:v1beta2
- name:flowschemas/status group:flowcontrol.apiserver.k8s.io version:v1beta2
- name:prioritylevelconfigurations group:flowcontrol.apiserver.k8s.io version:v1beta2
- name:prioritylevelconfigurations/status group:flowcontrol.apiserver.k8s.io version:v1beta2
- name:flowschemas group:flowcontrol.apiserver.k8s.io version:v1beta1
- name:flowschemas/status group:flowcontrol.apiserver.k8s.io version:v1beta1
- name:prioritylevelconfigurations group:flowcontrol.apiserver.k8s.io version:v1beta1
- name:prioritylevelconfigurations/status group:flowcontrol.apiserver.k8s.io version:v1beta1
- name:nodes group:metrics.k8s.io version:v1beta1
- name:pods group:metrics.k8s.io version:v1beta1
- */
复制代码 informer indexer lister机制

上图展示了自定义控制器的工作方式。在虚线上方,是client-go包的informer和indexer工作方式。informer负责监听Kubernetes API资源对象的变化,如创建、更新、删除等操作,并将这些变化通知给indexer进行索引和缓存。而indexer则是将API对象进行索引,以便在需要时快速地访问它们。lister则是对indexer的封装,提供了一种简单的方式来获取已经索引的对象列表,以供代码中的其他部分使用。这种分层结构的设计使得client-go可以高效地处理Kubernetes资源对象的变化,并在应用程序中方便地使用这些资源对象。
informer
Informer是Kubernetes API客户端中一种重要的机制,它可以实现对资源对象的监视和事件通知。当Kubernetes集群中的资源对象发生变化时,Informer可以及时地获取到这些变化,并将这些变化以事件的形式通知给相关的监听器。Informer通过调用API Server提供的REST接口,以及Kubernetes中定义的watch机制,实现了对集群资源对象的全面监视。
下面是一个简单的pod informer示例,用于监控所有pod的变化并将其放入队列中,worker从队列中取出pod并打印相关信息。
[code]package mainimport ( "fmt" "os" "os/signal" "syscall" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue")func main() { // 获取 kubeconfig 文件路径 kubeconfigPath := os.Getenv("KUBECONFIG") if kubeconfigPath == "" { kubeconfigPath = os.Getenv("HOME") + "/.kube/config" } // 使用 kubeconfig 文件创建 kubernetes 客户端 config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) if err != nil { panic(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } // 创建 informer 工厂 informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute) // 创建 informer 对象 podInformer := informerFactory.Core().V1().Pods() // 创建工作队列 queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) // 定义处理新增、更新和删除事件的回调函数 podHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } }, UpdateFunc: func(oldObj, newObj interface{}) { key, err := cache.MetaNamespaceKeyFunc(newObj) if err == nil { queue.Add(key) } }, DeleteFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } }, } // 将回调函数注册到 informer 上 podInformer.Informer().AddEventHandler(podHandler) // 启动 informer stopCh := make(chan struct{}) defer close(stopCh) informerFactory.Start(stopCh) // 等待 informer 同步完成 if !cache.WaitForCacheSync(stopCh) { panic("同步 informer 缓存失败") } // 创建信号处理程序,用于捕捉 SIGTERM 和 SIGINT 信号 signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, syscall.SIGTERM, syscall.SIGINT) // 创建 worker 函数,用于处理队列中的事件 processNextItem := func() { obj, shutdown := queue.Get() if shutdown { return } // 转换对象为 Pod key := obj.(string) podObj, exists, err := podInformer.Informer().GetIndexer().GetByKey(key) if err != nil { queue.Forget(obj) panic(fmt.Sprintf("获取 Pod 失败:%v", err)) } if !exists { // 如果对象已经被删除,就把它从队列中移除 queue.Forget(obj) return } // 在这里添加处理 Pod 的逻辑 pod := podObj.(*v1.Pod) fmt.Printf("处理 Pod: namespace:%v,podName:%v\n", pod.Namespace, pod.Name) // 处理完事件后,把它从队列中移除 queue.Forget(obj) return } // 启动 worker go wait.Until(processNextItem, time.Second, stopCh) // 等待信号 |