ToB企服应用市场:ToB评测及商务社交产业平台

标题: 18.2 k8s-apiserver监控源码解读 [打印本页]

作者: 雁过留声    时间: 2024-9-28 08:48
标题: 18.2 k8s-apiserver监控源码解读
本节重点介绍 :


k8s源码地址分布

k8s代码库


组件堆栈列表 地址

Repositories currently staged here:

下载 apiserver 源码

  1. go get -d  k8s.io/apiserver
复制代码
分析apiserver 监控源码

以qps指标 apiserver_request_total为例

定位源码位置


  1.         requestCounter = compbasemetrics.NewCounterVec(
  2.                 &compbasemetrics.CounterOpts{
  3.                         Name:           "apiserver_request_total",
  4.                         Help:           "Counter of apiserver requests broken out for each verb, dry run value, group, version, resource, scope, component, and HTTP response code.",
  5.                         StabilityLevel: compbasemetrics.STABLE,
  6.                 },
  7.                 []string{"verb", "dry_run", "group", "version", "resource", "subresource", "scope", "component", "code"},
  8.         )
复制代码

  1. apiserver_request_total{code="0", component="apiserver", contentType="application/json", group="apiregistration.k8s.io", instance="172.20.70.205:6443", job="kubernetes-apiservers", resource="apiservices", scope="cluster", verb="WATCH", version="v1beta1"}
  2. 9014
  3. apiserver_request_total{code="0", component="apiserver", contentType="application/json", group="apps", instance="172.20.70.205:6443", job="kubernetes-apiservers", resource="daemonsets", scope="cluster", verb="WATCH", version="v1"}
  4. 8863
  5. apiserver_request_total{code="0", component="apiserver", contentType="application/json", group="apps", instance="172.20.70.205:6443", job="kubernetes-apiservers", resource="deployments", scope="cluster", verb="WATCH", version="v1"}
  6. 8830
  7. apiserver_request_total{code="0", component="apiserver", contentType="application/json", group="crd.projectcalico.org", instance="172.20.70.205:6443", job="kubernetes-apiservers", resource="bgpconfigurations", scope="cluster", verb="WATCH", version="v1"}
  8. 5575
  9. apiserver_request_total{code="0", component="apiserver", contentType="application/json", group="crd.projectcalico.org", instance="172.20.70.205:6443", job="kubernetes-apiservers", resource="bgppeers", scope="cluster", verb="WATCH", version="v1"}
  10. 2791
  11. apiserver_request_total{code="0", component="apiserver", contentType="application/json", group="crd.projectcalico.org", instance="172.20.70.205:6443", job="kubernetes-apiservers", resource="blockaffinities", scope="cluster", verb="WATCH", version="v1"}
  12. 2942
  13. apiserver_request_total{code="0", component="apiserver", contentType="application/json", group="crd.projectcalico.org", instance="172.20.70.205:6443", job="kubernetes-apiservers", resource="clusterinformations", scope="cluster", verb="WATCH", version="v1"}
  14. 2770
复制代码
处理哀求的metric 函数 MonitorRequest


  1. // MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
  2. // a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
  3. func MonitorRequest(req *http.Request, verb, group, version, resource, subresource, scope, component string, deprecated bool, removedRelease string, httpCode, respSize int, elapsed time.Duration) {
  4.         // We don't use verb from <requestInfo>, as this may be propagated from
  5.         // InstrumentRouteFunc which is registered in installer.go with predefined
  6.         // list of verbs (different than those translated to RequestInfo).
  7.         // However, we need to tweak it e.g. to differentiate GET from LIST.
  8.         reportedVerb := cleanVerb(CanonicalVerb(strings.ToUpper(req.Method), scope), verb, req)
  9.         dryRun := cleanDryRun(req.URL)
  10.         elapsedSeconds := elapsed.Seconds()
  11.         requestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component, codeToString(httpCode)).Inc()
  12.         // MonitorRequest happens after authentication, so we can trust the username given by the request
  13.         info, ok := request.UserFrom(req.Context())
  14.         if ok && info.GetName() == user.APIServerUser {
  15.                 apiSelfRequestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, resource, subresource).Inc()
  16.         }
  17.         if deprecated {
  18.                 deprecatedRequestGauge.WithContext(req.Context()).WithLabelValues(group, version, resource, subresource, removedRelease).Set(1)
  19.                 audit.AddAuditAnnotation(req.Context(), deprecatedAnnotationKey, "true")
  20.                 if len(removedRelease) > 0 {
  21.                         audit.AddAuditAnnotation(req.Context(), removedReleaseAnnotationKey, removedRelease)
  22.                 }
  23.         }
  24.         requestLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds)
  25.         // We are only interested in response sizes of read requests.
  26.         if verb == "GET" || verb == "LIST" {
  27.                 responseSizes.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize))
  28.         }
  29. }
复制代码

  1. requestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component, codeToString(httpCode)).Inc()
复制代码

  1. func (v *CounterVec) WithLabelValues(lvs ...string) CounterMetric {
  2.         if !v.IsCreated() {
  3.                 return noop // return no-op counter
  4.         }
  5.         if v.LabelValueAllowLists != nil {
  6.                 v.LabelValueAllowLists.ConstrainToAllowedList(v.originalLabels, lvs)
  7.         }
  8.         return v.CounterVec.WithLabelValues(lvs...)
  9. }
复制代码
自己哀求的计数


  1.         info, ok := request.UserFrom(req.Context())
  2.         if ok && info.GetName() == user.APIServerUser {
  3.                 apiSelfRequestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, resource, subresource).Inc()
  4.         }
复制代码
要被废弃的api被哀求


  1.         if deprecated {
  2.                 deprecatedRequestGauge.WithContext(req.Context()).WithLabelValues(group, version, resource, subresource, removedRelease).Set(1)
  3.                 audit.AddAuditAnnotation(req.Context(), deprecatedAnnotationKey, "true")
  4.                 if len(removedRelease) > 0 {
  5.                         audit.AddAuditAnnotation(req.Context(), removedReleaseAnnotationKey, removedRelease)
  6.                 }
  7.         }
复制代码

  1. apiserver_requested_deprecated_apis{group="apiregistration.k8s.io", instance="172.20.70.205:6443", job="kubernetes-apiservers", removed_release="1.22", resource="apiservices", version="v1beta1"}
  2. 1
  3. apiserver_requested_deprecated_apis{group="authorization.k8s.io", instance="172.20.70.205:6443", job="kubernetes-apiservers", removed_release="1.22", resource="subjectaccessreviews", version="v1beta1"}
  4. 1
  5. apiserver_requested_deprecated_apis{group="certificates.k8s.io", instance="172.20.70.205:6443", job="kubernetes-apiservers", removed_release="1.22", resource="certificatesigningrequests", version="v1beta1"}
  6. 1
  7. apiserver_requested_deprecated_apis{group="extensions", instance="172.20.70.205:6443", job="kubernetes-apiservers", removed_release="1.22", resource="ingresses", version="v1beta1"}
  8. 1
  9. apiserver_requested_deprecated_apis{group="networking.k8s.io", instance="172.20.70.205:6443", job="kubernetes-apiservers", removed_release="1.22", resource="ingressclasses", version="v1beta1"}
  10. 1
  11. apiserver_requested_deprecated_apis{group="networking.k8s.io", instance="172.20.70.205:6443", job="kubernetes-apiservers", removed_release="1.22", resource="ingresses", version="v1beta1"}
  12. 1
  13. apiserver_requested_deprecated_apis{group="scheduling.k8s.io", instance="172.20.70.205:6443", job="kubernetes-apiservers", removed_release="1.22", resource="priorityclasses", version="v1beta1"}
  14. 1
复制代码
设置哀求延迟值


  1. requestLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds)
复制代码

  1.         requestLatencies = compbasemetrics.NewHistogramVec(
  2.                 &compbasemetrics.HistogramOpts{
  3.                         Name: "apiserver_request_duration_seconds",
  4.                         Help: "Response latency distribution in seconds for each verb, dry run value, group, version, resource, subresource, scope and component.",
  5.                         // This metric is used for verifying api call latencies SLO,
  6.                         // as well as tracking regressions in this aspects.
  7.                         // Thus we customize buckets significantly, to empower both usecases.
  8.                         Buckets: []float64{0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0,
  9.                                 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60},
  10.                         StabilityLevel: compbasemetrics.STABLE,
  11.                 },
  12.                 []string{"verb", "dry_run", "group", "version", "resource", "subresource", "scope", "component"},
  13.         )
复制代码

  1. histogram_quantile(0.99, sum(rate(apiserver_request_duration_seconds_bucket{job="kubernetes-apiservers"}[5m])) by (verb, le))
复制代码

设置 http响应的巨细histogram


  1.         if verb == "GET" || verb == "LIST" {
  2.                 responseSizes.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize))
  3.         }
复制代码

  1.         responseSizes = compbasemetrics.NewHistogramVec(
  2.                 &compbasemetrics.HistogramOpts{
  3.                         Name: "apiserver_response_sizes",
  4.                         Help: "Response size distribution in bytes for each group, version, verb, resource, subresource, scope and component.",
  5.                         // Use buckets ranging from 1000 bytes (1KB) to 10^9 bytes (1GB).
  6.                         Buckets:        compbasemetrics.ExponentialBuckets(1000, 10.0, 7),
  7.                         StabilityLevel: compbasemetrics.ALPHA,
  8.                 },
  9.                 []string{"verb", "group", "version", "resource", "subresource", "scope", "component"},
  10.         )
复制代码
追踪MonitorRequest调用链

InstrumentRouteFunc在prometheus的HandlerFunc底子上封装了k8s的go-restful

  1. // InstrumentRouteFunc works like Prometheus' InstrumentHandlerFunc but wraps
  2. // the go-restful RouteFunction instead of a HandlerFunc plus some Kubernetes endpoint specific information.
  3. func InstrumentRouteFunc(verb, group, version, resource, subresource, scope, component string, deprecated bool, removedRelease string, routeFunc restful.RouteFunction) restful.RouteFunction {
  4.         return restful.RouteFunction(func(req *restful.Request, response *restful.Response) {
  5.                 requestReceivedTimestamp, ok := request.ReceivedTimestampFrom(req.Request.Context())
  6.                 if !ok {
  7.                         requestReceivedTimestamp = time.Now()
  8.                 }
  9.                 delegate := &ResponseWriterDelegator{ResponseWriter: response.ResponseWriter}
  10.                 //lint:file-ignore SA1019 Keep supporting deprecated http.CloseNotifier
  11.                 _, cn := response.ResponseWriter.(http.CloseNotifier)
  12.                 _, fl := response.ResponseWriter.(http.Flusher)
  13.                 _, hj := response.ResponseWriter.(http.Hijacker)
  14.                 var rw http.ResponseWriter
  15.                 if cn && fl && hj {
  16.                         rw = &fancyResponseWriterDelegator{delegate}
  17.                 } else {
  18.                         rw = delegate
  19.                 }
  20.                 response.ResponseWriter = rw
  21.                 routeFunc(req, response)
  22.                 MonitorRequest(req.Request, verb, group, version, resource, subresource, scope, component, deprecated, removedRelease, delegate.Status(), delegate.ContentLength(), time.Since(requestReceivedTimestamp))
  23.         })
  24. }
复制代码
registerResourceHandlers 中根据不同的verb处理


  1.                 case "LIST": // List all resources of a kind.
  2.                         doc := "list objects of kind " + kind
  3.                         if isSubresource {
  4.                                 doc = "list " + subresource + " of objects of kind " + kind
  5.                         }
  6.                         handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
  7.                         handler = utilwarning.AddWarningsHandler(handler, warnings)
  8.                         route := ws.GET(action.Path).To(handler).
  9.                                 Doc(doc).
  10.                                 Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
  11.                                 Operation("list"+namespaced+kind+strings.Title(subresource)+operationSuffix).
  12.                                 Produces(append(storageMeta.ProducesMIMETypes(action.Verb), allMediaTypes...)...).
  13.                                 Returns(http.StatusOK, "OK", versionedList).
  14.                                 Writes(versionedList)
复制代码

  1. // Install handlers for API resources.
  2. func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
  3.         var apiResources []metav1.APIResource
  4.         var resourceInfos []*storageversion.ResourceInfo
  5.         var errors []error
  6.         ws := a.newWebService()
  7.         // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
  8.         paths := make([]string, len(a.group.Storage))
  9.         var i int = 0
  10.         for path := range a.group.Storage {
  11.                 paths[i] = path
  12.                 i++
  13.         }
  14.         sort.Strings(paths)
  15.         for _, path := range paths {
  16.                 apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
  17.                 if err != nil {
  18.                         errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
  19.                 }
  20.                 if apiResource != nil {
  21.                         apiResources = append(apiResources, *apiResource)
  22.                 }
  23.                 if resourceInfo != nil {
  24.                         resourceInfos = append(resourceInfos, resourceInfo)
  25.                 }
  26.         }
  27.         return apiResources, resourceInfos, ws, errors
  28. }
复制代码
本节重点总结 :



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4