ToB企服应用市场:ToB评测及商务社交产业平台

标题: 深入解析Kubernetes admission webhooks [打印本页]

作者: 忿忿的泥巴坨    时间: 2022-8-25 19:17
标题: 深入解析Kubernetes admission webhooks
BACKGROUND

admission controllers的特点
下图,显示了用户操作资源的流程,可以看出 admission controllers 作用是在通过身份验证资源持久化之前起到拦截作用。在准入控制器的加入会使kubernetes增加了更高级的安全功能。

图:Kubernetes API 请求的请求处理步骤图Source:https://kubernetes.io/blog/2019/03/21/a-guide-to-kubernetes-admission-controllers/这里找到一个大佬博客画的图,通过两张图可以很清晰的了解到admission webhook流程,与官方给出的不一样的地方在于,这里清楚地定位了kubernetes admission webhook 处于准入控制中,RBAC之后,push 之前。

图:Kubernetes API 请求的请求处理步骤图(详细)Source:https://www.armosec.io/blog/kubernetes-admission-controller/两种控制器有什么区别?

根据官方提供的说法是
Mutating controllers may modify related objects to the requests they admit; validating controllers may not
从结构图中也可以看出,validating  是在持久化之前,而 Mutating  是在结构验证前,根据这些特性我们可以使用 Mutating 修改这个资源对象内容(如增加验证的信息),在 validating 中验证是否合法。
composition of admission controllers

kubernetes中的  admission controllers 由两部分组成:
Mutating 控制器可以修改他们处理的资源对象,Validating 控制器不会。当在任何一个阶段中的任何控制器拒绝这个了请求,则会立即拒绝整个请求,并将错误返回。
admission webhook

由于准入控制器是内置在 kube-apiserver 中的,这种情况下就限制了admission controller的可扩展性。在这种背景下,kubernetes提供了一种可扩展的准入控制器 extensible admission controllers,这种行为叫做动态准入控制 Dynamic Admission Control,而提供这个功能的就是 admission webhook 。
admission webhook  通俗来讲就是 HTTP 回调,通过定义一个http server,接收准入请求并处理。用户可以通过kubernetes提供的两种类型的 admission webhook,validating admission webhookmutating admission webhook。来完成自定义的准入策略的处理。
webhook 就是
注:从上面的流程图也可以看出,admission webhook 也是有顺序的。首先调用mutating webhook,然后会调用validating webhook。
如何使用准入控制器

使用条件:kubernetes v1.16 使用 admissionregistration.k8s.io/v1 ;kubernetes v1.9 使用 admissionregistration.k8s.io/v1beta1。
如何在集群中开启准入控制器? :查看kube-apiserver 的启动参数 --enable-admission-plugins ;通过该参数来配置要启动的准入控制器,如 --enable-admission-plugins=NodeRestriction 多个准入控制器以 , 分割,顺序无关紧要。 反之使用 --disable-admission-plugins 参数可以关闭相应的准入控制器(Refer to apiserver opts)。
通过 kubectl 命令可以看到,当前kubernetes集群所支持准入控制器的版本
  1. $ kubectl api-versions | grep admissionregistration.k8s.io/v1
  2. admissionregistration.k8s.io/v1
  3. admissionregistration.k8s.io/v1beta1
复制代码
webhook工作原理

通过上面的学习,已经了解到了两种webhook的工作原理如下所示:
mutating webhook,会在持久化前拦截在 MutatingWebhookConfiguration 中定义的规则匹配的请求。MutatingAdmissionWebhook 通过向 mutating webhook 服务器发送准入请求来执行验证。
validaing webhook,会在持久化前拦截在 ValidatingWebhookConfiguration 中定义的规则匹配的请求。ValidatingAdmissionWebhook 通过将准入请求发送到 validating webhook server来执行验证。
那么接下来将从源码中看这个在这个工作流程中,究竟做了些什么?
资源类型

对于 1.9 版本之后,也就是 v1 版本 ,admission 被定义在 k8s.io\api\admissionregistration\v1\types.go ,大同小异,因为本地只有1.18集群,所以以这个讲解。
对于 Validating Webhook 来讲实现主要都在webhook中
  1. type ValidatingWebhookConfiguration struct {
  2.     // 每个api必须包含下列的metadata,这个是kubernetes规范,可以在注释中的url看到相关文档
  3.         metav1.TypeMeta `json:",inline"`
  4.         metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
  5.         // Webhooks在这里被表示为[]ValidatingWebhook,表示我们可以注册多个
  6.         // +optional
  7.         // +patchMergeKey=name
  8.         // +patchStrategy=merge
  9.         Webhooks []ValidatingWebhook `json:"webhooks,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,2,rep,name=Webhooks"`
  10. }
复制代码
webhook,则是对这种类型的webhook提供的操作、资源等。对于这部分不做过多的注释了,因为这里本身为kubernetes API资源,官网有很详细的例子与说明。这里更多字段的意思的可以参考官方 doc
  1. type ValidatingWebhook struct {
  2.         //  admission webhook的名词,Required
  3.         Name string `json:"name" protobuf:"bytes,1,opt,name=name"`
  4.         // ClientConfig 定义了与webhook通讯的方式 Required
  5.         ClientConfig WebhookClientConfig `json:"clientConfig" protobuf:"bytes,2,opt,name=clientConfig"`
  6.         // rule表示了webhook对于哪些资源及子资源的操作进行关注
  7.         Rules []RuleWithOperations `json:"rules,omitempty" protobuf:"bytes,3,rep,name=rules"`
  8.         // FailurePolicy 对于无法识别的value将如何处理,allowed/Ignore optional
  9.         FailurePolicy *FailurePolicyType `json:"failurePolicy,omitempty" protobuf:"bytes,4,opt,name=failurePolicy,casttype=FailurePolicyType"`
  10.         // matchPolicy 定义了如何使用“rules”列表来匹配传入的请求。
  11.         MatchPolicy *MatchPolicyType `json:"matchPolicy,omitempty" protobuf:"bytes,9,opt,name=matchPolicy,casttype=MatchPolicyType"`
  12.         NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty" protobuf:"bytes,5,opt,name=namespaceSelector"`
  13.         SideEffects *SideEffectClass `json:"sideEffects" protobuf:"bytes,6,opt,name=sideEffects,casttype=SideEffectClass"`
  14.         AdmissionReviewVersions []string `json:"admissionReviewVersions" protobuf:"bytes,8,rep,name=admissionReviewVersions"`
  15. }
复制代码
到这里了解了一个webhook资源的定义,那么这个如何使用呢?通过 Find Usages 找到一个 k8s.io/apiserver/pkg/admission/plugin/webhook/accessors.go 在使用它。这里没有注释,但在结构上可以看出,包含客户端与一系列选择器组成
  1. type mutatingWebhookAccessor struct {
  2.         *v1.MutatingWebhook
  3.         uid               string
  4.         configurationName string
  5.         initObjectSelector sync.Once
  6.         objectSelector     labels.Selector
  7.         objectSelectorErr  error
  8.         initNamespaceSelector sync.Once
  9.         namespaceSelector     labels.Selector
  10.         namespaceSelectorErr  error
  11.         initClient sync.Once
  12.         client     *rest.RESTClient
  13.         clientErr  error
  14. }
复制代码
accessor 因为包含了整个webhookconfig定义的一些动作(这里个人这么觉得)。
accessor.go 下面 有一个 GetRESTClient 方法 ,通过这里可以看出,这里做的就是使用根据 accessor 构造一个客户端。
  1. func (m *mutatingWebhookAccessor) GetRESTClient(clientManager *webhookutil.ClientManager) (*rest.RESTClient, error) {
  2.         m.initClient.Do(func() {
  3.                 m.client, m.clientErr = clientManager.HookClient(hookClientConfigForWebhook(m))
  4.         })
  5.         return m.client, m.clientErr
  6. }
复制代码
到这步骤已经没必要往下看了,因已经知道这里是请求webhook前的步骤了,下面就是何时请求了。
k8s.io\apiserver\pkg\admission\plugin\webhook\validating\dispatcher.go 下面有两个方法,Dispatch去请求我们自己定义的webhook
  1. func (d *validatingDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces, hooks []webhook.WebhookAccessor) error {
  2.         var relevantHooks []*generic.WebhookInvocation
  3.         // Construct all the versions we need to call our webhooks
  4.         versionedAttrs := map[schema.GroupVersionKind]*generic.VersionedAttributes{}
  5.         for _, hook := range hooks {
  6.                 invocation, statusError := d.plugin.ShouldCallHook(hook, attr, o)
  7.                 if statusError != nil {
  8.                         return statusError
  9.                 }
  10.                 if invocation == nil {
  11.                         continue
  12.                 }
  13.                 relevantHooks = append(relevantHooks, invocation)
  14.                 // If we already have this version, continue
  15.                 if _, ok := versionedAttrs[invocation.Kind]; ok {
  16.                         continue
  17.                 }
  18.                 versionedAttr, err := generic.NewVersionedAttributes(attr, invocation.Kind, o)
  19.                 if err != nil {
  20.                         return apierrors.NewInternalError(err)
  21.                 }
  22.                 versionedAttrs[invocation.Kind] = versionedAttr
  23.         }
  24.         if len(relevantHooks) == 0 {
  25.                 // no matching hooks
  26.                 return nil
  27.         }
  28.         // Check if the request has already timed out before spawning remote calls
  29.         select {
  30.         case <-ctx.Done():
  31.                 // parent context is canceled or timed out, no point in continuing
  32.                 return apierrors.NewTimeoutError("request did not complete within requested timeout", 0)
  33.         default:
  34.         }
  35.         wg := sync.WaitGroup{}
  36.         errCh := make(chan error, len(relevantHooks))
  37.         wg.Add(len(relevantHooks))
  38.     // 循环所有相关的注册的hook
  39.         for i := range relevantHooks {
  40.                 go func(invocation *generic.WebhookInvocation) {
  41.                         defer wg.Done()
  42.             // invacation 中有一个 Accessor,Accessor注册了一个相关的webhookconfig
  43.             // 也就是我们 kubectl -f 注册进来的那个webhook的相关配置
  44.                         hook, ok := invocation.Webhook.GetValidatingWebhook()
  45.                         if !ok {
  46.                                 utilruntime.HandleError(fmt.Errorf("validating webhook dispatch requires v1.ValidatingWebhook, but got %T", hook))
  47.                                 return
  48.                         }
  49.                         versionedAttr := versionedAttrs[invocation.Kind]
  50.                         t := time.Now()
  51.             // 调用了callHook去请求我们自定义的webhook
  52.                         err := d.callHook(ctx, hook, invocation, versionedAttr)
  53.                         ignoreClientCallFailures := hook.FailurePolicy != nil && *hook.FailurePolicy == v1.Ignore
  54.                         rejected := false
  55.                         if err != nil {
  56.                                 switch err := err.(type) {
  57.                                 case *webhookutil.ErrCallingWebhook:
  58.                                         if !ignoreClientCallFailures {
  59.                                                 rejected = true
  60.                                                 admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionCallingWebhookError, 0)
  61.                                         }
  62.                                 case *webhookutil.ErrWebhookRejection:
  63.                                         rejected = true
  64.                                         admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionNoError, int(err.Status.ErrStatus.Code))
  65.                                 default:
  66.                                         rejected = true
  67.                                         admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionAPIServerInternalError, 0)
  68.                                 }
  69.                         }
  70.                         admissionmetrics.Metrics.ObserveWebhook(time.Since(t), rejected, versionedAttr.Attributes, "validating", hook.Name)
  71.                         if err == nil {
  72.                                 return
  73.                         }
  74.                         if callErr, ok := err.(*webhookutil.ErrCallingWebhook); ok {
  75.                                 if ignoreClientCallFailures {
  76.                                         klog.Warningf("Failed calling webhook, failing open %v: %v", hook.Name, callErr)
  77.                                         utilruntime.HandleError(callErr)
  78.                                         return
  79.                                 }
  80.                                 klog.Warningf("Failed calling webhook, failing closed %v: %v", hook.Name, err)
  81.                                 errCh <- apierrors.NewInternalError(err)
  82.                                 return
  83.                         }
  84.                         if rejectionErr, ok := err.(*webhookutil.ErrWebhookRejection); ok {
  85.                                 err = rejectionErr.Status
  86.                         }
  87.                         klog.Warningf("rejected by webhook %q: %#v", hook.Name, err)
  88.                         errCh <- err
  89.                 }(relevantHooks[i])
  90.         }
  91.         wg.Wait()
  92.         close(errCh)
  93.         var errs []error
  94.         for e := range errCh {
  95.                 errs = append(errs, e)
  96.         }
  97.         if len(errs) == 0 {
  98.                 return nil
  99.         }
  100.         if len(errs) > 1 {
  101.                 for i := 1; i < len(errs); i++ {
  102.                         // TODO: merge status errors; until then, just return the first one.
  103.                         utilruntime.HandleError(errs[i])
  104.                 }
  105.         }
  106.         return errs[0]
  107. }
复制代码
MutatingAdmission

[code]apiVersion: admissionregistration.k8s.io/v1kind: ValidatingWebhookConfigurationmetadata:  name: "valipod-policy.example.com"webhooks:- name: "valipod-policy.example.com"  rules:    - apiGroups:   ["apps"] # 拦截资源的Group "" 表示 core。"*" 表示所有。      apiVersions: ["v1"] # 拦截资源的版本      operations:  ["CREATE"] # 什么请求下拦截      resources:   ["deployments"]  # 拦截什么资源      scope:       "Namespaced" # 生效的范围,cluster还是namespace "*"表示没有范围限制。  clientConfig: # 我们部署的webhook服务,    url: "https://10.0.0.1:81/validate" # 这里是外部模式    #      service: # service是在cluster-in模式下    #        namespace: "default"    #        name: "admission-webhook"    #        port: 81 # 服务的端口    #        path: "/mutate" # path是对应用于验证的接口    # caBundle是提供给 admission webhook CA证书    caBundle: "Ci0tLS0tQk...




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4