kubernetes client-go功能介绍

打印 上一主题 下一主题

主题 852|帖子 852|积分 2556

原味地址

https://haiyux.cc/2023/02/26/k8s-client-go/
client-go是什么?

client-go是Kubernetes官方提供的Go语言客户端库,用于与Kubernetes API服务器交互。使用client-go,您可以编写Go语言程序来创建、修改和删除Kubernetes对象,如Pod、Deployment、Service等。
作用

client-go的主要功能包括:

  • 连接Kubernetes API服务器:client-go提供了一个API客户端,用于连接Kubernetes API服务器。
  • 对象管理:client-go提供了一组API,用于创建、读取、更新和删除Kubernetes对象,如Pod、Deployment、Service等。
  • Watch API:client-go提供了一个Watch API,可以用于监视Kubernetes对象的变化。
  • 命名空间支持:client-go支持多个命名空间,并提供了一组API,用于管理命名空间。
  • 认证和授权:client-go提供了一组API,用于执行身份验证和授权,以确保只有授权的用户才能对Kubernetes对象进行操作。
client-go是使用Kubernetes API的标准方式,是Kubernetes生态系统中的重要组成部分。
api client

client-go 中包含四种client,RestClient, ClientSet,DynamicClient和DiscoveryClient。

ClientSet,DynamicClient,DiscoveryClient都是RestClient上的封装
RestClient

RestClient是最基础的客户端,它基于HTTP请求进行了封装,实现了RESTful API。使用RESTClient提供的RESTful方法,如Get()、Put()、Post()和Delete(),可以直接与API进行交互。同时,它支持JSON和Protocol Buffers,并支持所有原生资源和自定义资源定义(CRDs)。然而,为了更加优雅地处理API交互,一般需要进一步封装,通过Clientset对RESTClient进行封装,然后再对外提供接口和服务。
  1. package main
  2. import (
  3.         "context"
  4.         "fmt"
  5.         "path/filepath"
  6.         corev1 "k8s.io/api/core/v1"
  7.         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  8.         "k8s.io/client-go/kubernetes/scheme"
  9.         "k8s.io/client-go/rest"
  10.         "k8s.io/client-go/tools/clientcmd"
  11.         "k8s.io/client-go/util/homedir"
  12. )
  13. func main() {
  14.         // 使用kubeconfig生成配置
  15.         config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
  16.         if err != nil {
  17.                 panic(err)
  18.         }
  19.         config.APIPath = "api"
  20.         config.GroupVersion = &corev1.SchemeGroupVersion
  21.         config.NegotiatedSerializer = scheme.Codecs
  22.         // 生成restClient
  23.         restClient, err := rest.RESTClientFor(config)
  24.         if err != nil {
  25.                 panic(err)
  26.         }
  27.         rest := &corev1.PodList{}
  28.         if err = restClient.Get().Namespace("default").Resource("pods").VersionedParams(&metav1.ListOptions{},
  29.                 scheme.ParameterCodec).Do(context.TODO()).Into(rest); err != nil {
  30.                 panic(err)
  31.         }
  32.         for _, v := range rest.Items {
  33.                 fmt.Printf("NameSpace: %v  Name: %v  Status: %v \n", v.Namespace, v.Name, v.Status.Phase)
  34.         }
  35. }
  36. /*
  37. 结果
  38. NameSpace: default  Name: nginx-76d6c9b8c-8ljkt  Status: Running
  39. NameSpace: default  Name: nginx-76d6c9b8c-jqv9h  Status: Running
  40. NameSpace: default  Name: nginx-76d6c9b8c-kr9d2  Status: Running
  41. NameSpace: default  Name: nginx-76d6c9b8c-m4g5l  Status: Running
  42. NameSpace: default  Name: nginx-76d6c9b8c-n8st9  Status: Running
  43. */
复制代码
ClientSet

ClientSet是在RestClient的基础上封装了对资源和版本的管理方法。资源可以理解为一个客户端,而ClientSet是多个客户端的集合。在操作资源对象时,需要指定Group和Version,然后根据资源获取。然而,ClientSet不支持自定义资源定义(CRDs),但使用kubebuilder生成代码时,会生成相应的ClientSet。
  1. package main
  2. import (
  3.         "context"
  4.         "fmt"
  5.         "path/filepath"
  6.         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  7.         "k8s.io/client-go/kubernetes"
  8.         "k8s.io/client-go/tools/clientcmd"
  9.         "k8s.io/client-go/util/homedir"
  10. )
  11. func main() {
  12.         ctx := context.Background()
  13.         // 使用kubeconfig生成配置 ~/.kube/config
  14.         config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
  15.         if err != nil {
  16.                 panic(err)
  17.         }
  18.         // 生成clientSet
  19.         clientSet, err := kubernetes.NewForConfig(config)
  20.         if err != nil {
  21.                 panic(err)
  22.         }
  23.         nodeList, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
  24.         if err != nil {
  25.                 panic(err)
  26.         }
  27.         for _, node := range nodeList.Items {
  28.                 fmt.Printf("nodeName: %v, status: %v \n", node.GetName(), node.GetCreationTimestamp())
  29.         }
  30.   // pod 是有namespace资源所以指定namespace 而node没有
  31.         pods, err := clientSet.CoreV1().Pods("default").List(ctx, metav1.ListOptions{})
  32.         if err != nil {
  33.                 panic(err)
  34.         }
  35.         for _, v := range pods.Items {
  36.                 fmt.Printf("namespace: %v podName: %v status: %v \n", v.Namespace, v.Name, v.Status.Phase)
  37.         }
  38. }
  39. /*
  40. 结果:
  41. nodeName: minikube, status: 2023-01-27 18:45:35 +0800 CST
  42. nodeName: minikube-m02, status: 2023-02-26 21:19:30 +0800 CST
  43. nodeName: minikube-m03, status: 2023-02-26 21:19:38 +0800 CST
  44. namespace: default podName: nginx-76d6c9b8c-8ljkt status: Running
  45. namespace: default podName: nginx-76d6c9b8c-jqv9h status: Running
  46. namespace: default podName: nginx-76d6c9b8c-kr9d2 status: Running
  47. namespace: default podName: nginx-76d6c9b8c-m4g5l status: Running
  48. namespace: default podName: nginx-76d6c9b8c-n8st9 status: Running
  49. */
复制代码
DynamicClient

DynamicClient是一种动态客户端,它可以对任何资源进行RESTful操作,包括自定义资源定义(CRD)。与ClientSet不同,DynamicClient返回的对象是一个map[string]interface{}。如果一个控制器需要控制所有的API,可以使用DynamicClient。目前,DynamicClient在垃圾回收器和命名空间控制器中被广泛使用。
DynamicClient的处理过程将Resource(例如PodList)转换为unstructured类型。Kubernetes的所有资源都可以转换为这个结构类型。处理完毕后,再将其转换回PodList。整个转换过程类似于接口转换,即通过interface{}的断言实现。
DynamicClient是一种动态的客户端,它能处理Kubernetes所有的资源,但仅支持JSON
  1. package main
  2. import (
  3.         "context"
  4.         "fmt"
  5.         "path/filepath"
  6.         apiv1 "k8s.io/api/core/v1"
  7.         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  8.         "k8s.io/apimachinery/pkg/runtime"
  9.         "k8s.io/apimachinery/pkg/runtime/schema"
  10.         "k8s.io/client-go/dynamic"
  11.         "k8s.io/client-go/tools/clientcmd"
  12.         "k8s.io/client-go/util/homedir"
  13. )
  14. func main() {
  15.         ctx := context.Background()
  16.         // 使用kubeconfig生成配置 ~/.kube/config
  17.         config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
  18.         if err != nil {
  19.                 panic(err)
  20.         }
  21.         // dynamicClient
  22.         dynamicClient, err := dynamic.NewForConfig(config)
  23.         if err != nil {
  24.                 panic(err)
  25.         }
  26.         // 定义组版本资源
  27.         gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
  28.         unStructObj, err := dynamicClient.Resource(gvr).Namespace("default").List(ctx, metav1.ListOptions{})
  29.         if err != nil {
  30.                 panic(err)
  31.         }
  32.         podList := &apiv1.PodList{}
  33.         if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unStructObj.UnstructuredContent(), podList); err != nil {
  34.                 panic(err)
  35.         }
  36.         for _, v := range podList.Items {
  37.                 fmt.Printf("namespaces:%v  podName:%v status:%v \n", v.Namespace, v.Name, v.Status.Phase)
  38.         }
  39. }
  40. /*
  41. namespaces:default  podName:nginx-76d6c9b8c-8ljkt status:Running
  42. namespaces:default  podName:nginx-76d6c9b8c-jqv9h status:Running
  43. namespaces:default  podName:nginx-76d6c9b8c-kr9d2 status:Running
  44. namespaces:default  podName:nginx-76d6c9b8c-m4g5l status:Running
  45. namespaces:default  podName:nginx-76d6c9b8c-n8st9 status:Running
  46. */
复制代码
其中,GVR(group,version,resource) 用于标识 Kubernetes API 中的资源类型,其中 Group 表示 API 群组,Version 表示 API 版本,Resource 表示资源类型。例如,Deployment 的 GVR 为 "apps/v1/deployments",其中 "apps" 是 API 群组,"v1" 是 API 版本,"deployments" 是资源类型。
DiscoveryClient

DiscoveryClient 是一个发现客户端,它的主要作用是用于发现 API Server 支持的资源组、资源版本和资源信息。在 Kubernetes 中,API Server 支持很多资源组、资源版本和资源信息,我们可以通过使用 DiscoveryClient 来查看这些信息。此外,kubectl 的 API 版本和 API 资源也是通过 DiscoveryClient 来实现的。我们还可以将这些信息缓存到本地,以减轻 API 访问的压力。缓存文件默认存储在 ./kube/cache 和 ./kube/http-cache 目录下。
  1. package main
  2. import (
  3.         "fmt"
  4.         "path/filepath"
  5.         "k8s.io/apimachinery/pkg/runtime/schema"
  6.         "k8s.io/client-go/discovery"
  7.         "k8s.io/client-go/tools/clientcmd"
  8.         "k8s.io/client-go/util/homedir"
  9. )
  10. func main() {
  11.         // 使用kubeconfig生成配置 ~/.kube/config
  12.         config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
  13.         if err != nil {
  14.                 panic(err)
  15.         }
  16.         // 生成discoverClient
  17.         discoverClient, err := discovery.NewDiscoveryClientForConfig(config)
  18.         if err != nil {
  19.                 panic(err)
  20.         }
  21.         _, apiResourceList, err := discoverClient.ServerGroupsAndResources()
  22.         for _, v := range apiResourceList {
  23.                 gv, err := schema.ParseGroupVersion(v.GroupVersion)
  24.                 if err != nil {
  25.                         panic(err)
  26.                 }
  27.                 for _, resource := range v.APIResources {
  28.                         fmt.Printf("name:%v group:%v version:%v\n", resource.Name, gv.Group, gv.Version)
  29.                 }
  30.         }
  31. }
  32. /*
  33. name:bindings group: version:v1
  34. name:componentstatuses group: version:v1
  35. name:configmaps group: version:v1
  36. name:endpoints group: version:v1
  37. name:events group: version:v1
  38. name:limitranges group: version:v1
  39. name:namespaces group: version:v1
  40. name:namespaces/finalize group: version:v1
  41. name:namespaces/status group: version:v1
  42. name:nodes group: version:v1
  43. name:nodes/proxy group: version:v1
  44. name:nodes/status group: version:v1
  45. name:persistentvolumeclaims group: version:v1
  46. name:persistentvolumeclaims/status group: version:v1
  47. name:persistentvolumes group: version:v1
  48. name:persistentvolumes/status group: version:v1
  49. name:pods group: version:v1
  50. name:pods/attach group: version:v1
  51. name:pods/binding group: version:v1
  52. name:pods/ephemeralcontainers group: version:v1
  53. name:pods/eviction group: version:v1
  54. name:pods/exec group: version:v1
  55. name:pods/log group: version:v1
  56. name:pods/portforward group: version:v1
  57. name:pods/proxy group: version:v1
  58. name:pods/status group: version:v1
  59. name:podtemplates group: version:v1
  60. name:replicationcontrollers group: version:v1
  61. name:replicationcontrollers/scale group: version:v1
  62. name:replicationcontrollers/status group: version:v1
  63. name:resourcequotas group: version:v1
  64. name:resourcequotas/status group: version:v1
  65. name:secrets group: version:v1
  66. name:serviceaccounts group: version:v1
  67. name:serviceaccounts/token group: version:v1
  68. name:services group: version:v1
  69. name:services/proxy group: version:v1
  70. name:services/status group: version:v1
  71. name:apiservices group:apiregistration.k8s.io version:v1
  72. name:apiservices/status group:apiregistration.k8s.io version:v1
  73. name:controllerrevisions group:apps version:v1
  74. name:daemonsets group:apps version:v1
  75. name:daemonsets/status group:apps version:v1
  76. name:deployments group:apps version:v1
  77. name:deployments/scale group:apps version:v1
  78. name:deployments/status group:apps version:v1
  79. name:replicasets group:apps version:v1
  80. name:replicasets/scale group:apps version:v1
  81. name:replicasets/status group:apps version:v1
  82. name:statefulsets group:apps version:v1
  83. name:statefulsets/scale group:apps version:v1
  84. name:statefulsets/status group:apps version:v1
  85. name:events group:events.k8s.io version:v1
  86. name:tokenreviews group:authentication.k8s.io version:v1
  87. name:localsubjectaccessreviews group:authorization.k8s.io version:v1
  88. name:selfsubjectaccessreviews group:authorization.k8s.io version:v1
  89. name:selfsubjectrulesreviews group:authorization.k8s.io version:v1
  90. name:subjectaccessreviews group:authorization.k8s.io version:v1
  91. name:horizontalpodautoscalers group:autoscaling version:v2
  92. name:horizontalpodautoscalers/status group:autoscaling version:v2
  93. name:horizontalpodautoscalers group:autoscaling version:v1
  94. name:horizontalpodautoscalers/status group:autoscaling version:v1
  95. name:horizontalpodautoscalers group:autoscaling version:v2beta2
  96. name:horizontalpodautoscalers/status group:autoscaling version:v2beta2
  97. name:cronjobs group:batch version:v1
  98. name:cronjobs/status group:batch version:v1
  99. name:jobs group:batch version:v1
  100. name:jobs/status group:batch version:v1
  101. name:certificatesigningrequests group:certificates.k8s.io version:v1
  102. name:certificatesigningrequests/approval group:certificates.k8s.io version:v1
  103. name:certificatesigningrequests/status group:certificates.k8s.io version:v1
  104. name:ingressclasses group:networking.k8s.io version:v1
  105. name:ingresses group:networking.k8s.io version:v1
  106. name:ingresses/status group:networking.k8s.io version:v1
  107. name:networkpolicies group:networking.k8s.io version:v1
  108. name:networkpolicies/status group:networking.k8s.io version:v1
  109. name:poddisruptionbudgets group:policy version:v1
  110. name:poddisruptionbudgets/status group:policy version:v1
  111. name:clusterrolebindings group:rbac.authorization.k8s.io version:v1
  112. name:clusterroles group:rbac.authorization.k8s.io version:v1
  113. name:rolebindings group:rbac.authorization.k8s.io version:v1
  114. name:roles group:rbac.authorization.k8s.io version:v1
  115. name:csidrivers group:storage.k8s.io version:v1
  116. name:csinodes group:storage.k8s.io version:v1
  117. name:csistoragecapacities group:storage.k8s.io version:v1
  118. name:storageclasses group:storage.k8s.io version:v1
  119. name:volumeattachments group:storage.k8s.io version:v1
  120. name:volumeattachments/status group:storage.k8s.io version:v1
  121. name:csistoragecapacities group:storage.k8s.io version:v1beta1
  122. name:mutatingwebhookconfigurations group:admissionregistration.k8s.io version:v1
  123. name:validatingwebhookconfigurations group:admissionregistration.k8s.io version:v1
  124. name:customresourcedefinitions group:apiextensions.k8s.io version:v1
  125. name:customresourcedefinitions/status group:apiextensions.k8s.io version:v1
  126. name:priorityclasses group:scheduling.k8s.io version:v1
  127. name:leases group:coordination.k8s.io version:v1
  128. name:runtimeclasses group:node.k8s.io version:v1
  129. name:endpointslices group:discovery.k8s.io version:v1
  130. name:flowschemas group:flowcontrol.apiserver.k8s.io version:v1beta2
  131. name:flowschemas/status group:flowcontrol.apiserver.k8s.io version:v1beta2
  132. name:prioritylevelconfigurations group:flowcontrol.apiserver.k8s.io version:v1beta2
  133. name:prioritylevelconfigurations/status group:flowcontrol.apiserver.k8s.io version:v1beta2
  134. name:flowschemas group:flowcontrol.apiserver.k8s.io version:v1beta1
  135. name:flowschemas/status group:flowcontrol.apiserver.k8s.io version:v1beta1
  136. name:prioritylevelconfigurations group:flowcontrol.apiserver.k8s.io version:v1beta1
  137. name:prioritylevelconfigurations/status group:flowcontrol.apiserver.k8s.io version:v1beta1
  138. name:nodes group:metrics.k8s.io version:v1beta1
  139. name:pods group:metrics.k8s.io version:v1beta1
  140. */
复制代码
informer indexer  lister机制


上图展示了自定义控制器的工作方式。在虚线上方,是client-go包的informer和indexer工作方式。informer负责监听Kubernetes API资源对象的变化,如创建、更新、删除等操作,并将这些变化通知给indexer进行索引和缓存。而indexer则是将API对象进行索引,以便在需要时快速地访问它们。lister则是对indexer的封装,提供了一种简单的方式来获取已经索引的对象列表,以供代码中的其他部分使用。这种分层结构的设计使得client-go可以高效地处理Kubernetes资源对象的变化,并在应用程序中方便地使用这些资源对象。
informer

Informer是Kubernetes API客户端中一种重要的机制,它可以实现对资源对象的监视和事件通知。当Kubernetes集群中的资源对象发生变化时,Informer可以及时地获取到这些变化,并将这些变化以事件的形式通知给相关的监听器。Informer通过调用API Server提供的REST接口,以及Kubernetes中定义的watch机制,实现了对集群资源对象的全面监视。
下面是一个简单的pod informer示例,用于监控所有pod的变化并将其放入队列中,worker从队列中取出pod并打印相关信息。
[code]package mainimport (        "fmt"        "os"        "os/signal"        "syscall"        "time"        v1 "k8s.io/api/core/v1"        "k8s.io/apimachinery/pkg/util/wait"        "k8s.io/client-go/informers"        "k8s.io/client-go/kubernetes"        "k8s.io/client-go/tools/cache"        "k8s.io/client-go/tools/clientcmd"        "k8s.io/client-go/util/workqueue")func main() {        // 获取 kubeconfig 文件路径        kubeconfigPath := os.Getenv("KUBECONFIG")        if kubeconfigPath == "" {                kubeconfigPath = os.Getenv("HOME") + "/.kube/config"        }        // 使用 kubeconfig 文件创建 kubernetes 客户端        config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)        if err != nil {                panic(err)        }        clientset, err := kubernetes.NewForConfig(config)        if err != nil {                panic(err)        }        // 创建 informer 工厂        informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute)        // 创建 informer 对象        podInformer := informerFactory.Core().V1().Pods()        // 创建工作队列        queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())        // 定义处理新增、更新和删除事件的回调函数        podHandler := cache.ResourceEventHandlerFuncs{                AddFunc: func(obj interface{}) {                        key, err := cache.MetaNamespaceKeyFunc(obj)                        if err == nil {                                queue.Add(key)                        }                },                UpdateFunc: func(oldObj, newObj interface{}) {                        key, err := cache.MetaNamespaceKeyFunc(newObj)                        if err == nil {                                queue.Add(key)                        }                },                DeleteFunc: func(obj interface{}) {                        key, err := cache.MetaNamespaceKeyFunc(obj)                        if err == nil {                                queue.Add(key)                        }                },        }        // 将回调函数注册到 informer 上        podInformer.Informer().AddEventHandler(podHandler)        // 启动 informer        stopCh := make(chan struct{})        defer close(stopCh)        informerFactory.Start(stopCh)        // 等待 informer 同步完成        if !cache.WaitForCacheSync(stopCh) {                panic("同步 informer 缓存失败")        }        // 创建信号处理程序,用于捕捉 SIGTERM 和 SIGINT 信号        signalCh := make(chan os.Signal, 1)        signal.Notify(signalCh, syscall.SIGTERM, syscall.SIGINT)        // 创建 worker 函数,用于处理队列中的事件        processNextItem := func() {                obj, shutdown := queue.Get()                if shutdown {                        return                }                // 转换对象为 Pod                key := obj.(string)                podObj, exists, err := podInformer.Informer().GetIndexer().GetByKey(key)                if err != nil {                        queue.Forget(obj)                        panic(fmt.Sprintf("获取 Pod 失败:%v", err))                }                if !exists {                        // 如果对象已经被删除,就把它从队列中移除                        queue.Forget(obj)                        return                }                // 在这里添加处理 Pod 的逻辑                pod := podObj.(*v1.Pod)                fmt.Printf("处理 Pod: namespace:%v,podName:%v\n", pod.Namespace, pod.Name)                // 处理完事件后,把它从队列中移除                queue.Forget(obj)                return        }        // 启动 worker        go wait.Until(processNextItem, time.Second, stopCh)        // 等待信号

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

悠扬随风

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表