flink operator v1.10对接华为云对象存储OBS

打印 上一主题 下一主题

主题 880|帖子 880|积分 2644

1 概述

flink operator及其flink集群,默认不直接支持华为云OBS,需要在这些java步调的插件目录放一个jar包,以及修改flink设置后,才气支持集成华为云OBS。
相干链接参考:
  1. https://support.huaweicloud.com/bestpractice-obs/obs_05_1516.html
复制代码

2 环境准备

2.1 华为云kubernetes集群

准备一个kubernetes集群,如下图所示:



2.2 flink operator helm包下载地址

  1. https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz
复制代码

2.3 cert-manager yaml文件下载地址

  1. https://github.com/jetstack/cert-manager/releases/download/v1.17.2/cert-manager.yaml
复制代码

2.4 准备flink应用示例

  1. https://github.com/apache/flink/tree/master/flink-examples
复制代码
将flink官方示例的代码编译成jar包,再上传到对象存储OBS,如下图所示:

这些jar包存放在华为云OBS对象存储上,flink operator和可以通过OBS协议拉取jar包,终极提交给flink集群,并且flink集群的jobmanager、flink taskmanager也能读写OBS对象存储。


3 部署

3.1 安装cert-manager

此组件是flink operator webhook的一个依靠,因此先安装它。
  1. cd /tmp
  2. wget https://github.com/jetstack/cert-manager/releases/download/v1.17.1/cert-manager.yaml
  3. kubectl apply -f cert-manager.yaml
复制代码

3.2 安装helm二进制工具

  1. cd /tmp
  2. wget https://get.helm.sh/helm-v3.16.2-linux-amd64.tar.gz
  3. tar xf helm-v3.16.2-linux-amd64.tar.gz
  4. cd linux-amd64
  5. /bin/cp -f helm /usr/bin/
  6. helm env
复制代码

3.3 部署flink operator

下载fink operator的helm包,解压文件,末了通过helm命令将它部署在flink namespace中。
  1. cd /tmpwget https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz
  2. tar xf flink-kubernetes-operator-1.10.0-helm.tgz
复制代码
修改flink-kubernetes-operator/values.yaml文件,在文件的defaultConfiguration.flink-conf.yaml字段下新增如下内容:
  1. defaultConfiguration:
  2.   flink-conf.yaml: |+
  3.     fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
  4.     fs.obs.access.key: *********你的ak*********
  5.     fs.obs.secret.key: *********你的sk*********
  6.     fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com     # 这是对象存储端点,依据实际情况填写
复制代码
部署k8s资源,命令如下:
  1. helm upgrade --install flink-operator -n flink --create-namespace \
  2. --set image.repository=swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator \
  3. --set image.tag=1.10.0 \
  4. ./flink-kubernetes-operator/
复制代码
我将flink-obs的jar包放入到镜像swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45中,此镜像是公共镜像,各人可随意拉取使用。
接着,更新operator deployment(需要使用initContainer和obs-plugin的volume的挂载),直接kubectl apply如下内容即可:
  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4.   annotations:
  5.     meta.helm.sh/release-name: flink-operator
  6.     meta.helm.sh/release-namespace: flink
  7.   generation: 4
  8.   labels:
  9.     app.kubernetes.io/managed-by: Helm
  10.     app.kubernetes.io/name: flink-kubernetes-operator
  11.     app.kubernetes.io/version: 1.10.0
  12.     helm.sh/chart: flink-kubernetes-operator-1.10.0
  13.   name: flink-kubernetes-operator
  14.   namespace: flink
  15. spec:
  16.   replicas: 1
  17.   selector:
  18.     matchLabels:
  19.       app.kubernetes.io/name: flink-kubernetes-operator
  20.   strategy:
  21.     type: Recreate
  22.   template:
  23.     metadata:
  24.       annotations:
  25.         kubectl.kubernetes.io/default-container: flink-kubernetes-operator
  26.       creationTimestamp: null
  27.       labels:
  28.         app.kubernetes.io/name: flink-kubernetes-operator
  29.     spec:
  30.       initContainers:
  31.       - image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45
  32.         name: sidecar
  33.         command: ["sh"]
  34.         args: [
  35.                 "-c",
  36.                 "mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"
  37.         ]
  38.         volumeMounts:
  39.         - name: obs-plugin
  40.           mountPath: /opt/flink/plugins/obs-fs-hadoop
  41.       containers:
  42.       - command:
  43.         - /docker-entrypoint.sh
  44.         - operator
  45.         env:
  46.         - name: OPERATOR_NAMESPACE
  47.           valueFrom:
  48.             fieldRef:
  49.               apiVersion: v1
  50.               fieldPath: metadata.namespace
  51.         - name: HOST_IP
  52.           valueFrom:
  53.             fieldRef:
  54.               apiVersion: v1
  55.               fieldPath: status.hostIP
  56.         - name: POD_IP
  57.           valueFrom:
  58.             fieldRef:
  59.               apiVersion: v1
  60.               fieldPath: status.podIP
  61.         - name: POD_NAME
  62.           valueFrom:
  63.             fieldRef:
  64.               apiVersion: v1
  65.               fieldPath: metadata.name
  66.         - name: OPERATOR_NAME
  67.           value: flink-kubernetes-operator
  68.         - name: FLINK_CONF_DIR
  69.           value: /opt/flink/conf
  70.         - name: FLINK_PLUGINS_DIR
  71.           value: /opt/flink/plugins
  72.         - name: LOG_CONFIG
  73.           value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
  74.         - name: JVM_ARGS
  75.         image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0
  76.         imagePullPolicy: IfNotPresent
  77.         livenessProbe:
  78.           failureThreshold: 3
  79.           httpGet:
  80.             path: /
  81.             port: health-port
  82.             scheme: HTTP
  83.           initialDelaySeconds: 30
  84.           periodSeconds: 10
  85.           successThreshold: 1
  86.           timeoutSeconds: 1
  87.         name: flink-kubernetes-operator
  88.         ports:
  89.         - containerPort: 8085
  90.           name: health-port
  91.           protocol: TCP
  92.         resources: {}
  93.         securityContext: {}
  94.         startupProbe:
  95.           failureThreshold: 30
  96.           httpGet:
  97.             path: /
  98.             port: health-port
  99.             scheme: HTTP
  100.           periodSeconds: 10
  101.           successThreshold: 1
  102.           timeoutSeconds: 1
  103.         terminationMessagePath: /dev/termination-log
  104.         terminationMessagePolicy: File
  105.         volumeMounts:
  106.         - mountPath: /opt/flink/conf
  107.           name: flink-operator-config-volume
  108.         - mountPath: /opt/flink/artifacts
  109.           name: flink-artifacts-volume
  110.         - name: obs-plugin
  111.           mountPath: /opt/flink/plugins/obs-fs-hadoop
  112.       - command:
  113.         - /docker-entrypoint.sh
  114.         - webhook
  115.         env:
  116.         - name: WEBHOOK_KEYSTORE_PASSWORD
  117.           valueFrom:
  118.             secretKeyRef:
  119.               key: password
  120.               name: flink-operator-webhook-secret
  121.         - name: WEBHOOK_KEYSTORE_FILE
  122.           value: /certs/keystore.p12
  123.         - name: WEBHOOK_KEYSTORE_TYPE
  124.           value: pkcs12
  125.         - name: WEBHOOK_SERVER_PORT
  126.           value: "9443"
  127.         - name: LOG_CONFIG
  128.           value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
  129.         - name: JVM_ARGS
  130.         - name: FLINK_CONF_DIR
  131.           value: /opt/flink/conf
  132.         - name: FLINK_PLUGINS_DIR
  133.           value: /opt/flink/plugins
  134.         - name: OPERATOR_NAMESPACE
  135.           valueFrom:
  136.             fieldRef:
  137.               apiVersion: v1
  138.               fieldPath: metadata.namespace
  139.         image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0
  140.         imagePullPolicy: IfNotPresent
  141.         name: flink-webhook
  142.         resources: {}
  143.         securityContext: {}
  144.         terminationMessagePath: /dev/termination-log
  145.         terminationMessagePolicy: File
  146.         volumeMounts:
  147.         - mountPath: /certs
  148.           name: keystore
  149.           readOnly: true
  150.         - mountPath: /opt/flink/conf
  151.           name: flink-operator-config-volume
  152.       dnsPolicy: ClusterFirst
  153.       restartPolicy: Always
  154.       schedulerName: default-scheduler
  155.       securityContext:
  156.         runAsGroup: 9999
  157.         runAsUser: 9999
  158.       serviceAccount: flink-operator
  159.       serviceAccountName: flink-operator
  160.       terminationGracePeriodSeconds: 30
  161.       volumes:
  162.       - configMap:
  163.           defaultMode: 420
  164.           items:
  165.           - key: flink-conf.yaml
  166.             path: flink-conf.yaml
  167.           - key: log4j-operator.properties
  168.             path: log4j-operator.properties
  169.           - key: log4j-console.properties
  170.             path: log4j-console.properties
  171.           name: flink-operator-config
  172.         name: flink-operator-config-volume
  173.       - emptyDir: {}
  174.         name: flink-artifacts-volume
  175.       - name: keystore
  176.         secret:
  177.           defaultMode: 420
  178.           items:
  179.           - key: keystore.p12
  180.             path: keystore.p12
  181.           secretName: webhook-server-cert
  182.       - name: obs-plugin
  183.         emptyDir: {}
复制代码

3.4 部署flink session cluster

kubectl apply以下资源即可部署一个flink session集群,文件内容如下:
  1. apiVersion: flink.apache.org/v1beta1
  2. kind: FlinkDeployment
  3. metadata:
  4.   name: flink-session-cluster
  5.   namespace: flink
  6. spec:
  7.   image: swr.cn-south-1.myhuaweicloud.com/migrator/flink:1.19
  8.   flinkVersion: v1_19
  9.   flinkConfiguration:
  10.     fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
  11.     fs.obs.access.key: *********你的ak*********
  12.     fs.obs.secret.key: *********你的sk*********
  13.     fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com   # 这是对象存储端点,依据实际情况填写
  14.   jobManager:
  15.     resource:
  16.       memory: "2048m"
  17.       cpu: 2
  18.   taskManager:
  19.     resource:
  20.       memory: "2048m"
  21.       cpu: 2
  22.   serviceAccount: flink
  23.   podTemplate:
  24.     spec:
  25.       volumes:
  26.       - name: obs-plugin
  27.         emptyDir: {}
  28.       containers:
  29.       # Do not change the main container name
  30.       - name: flink-main-container
  31.         volumeMounts:
  32.         - name: obs-plugin
  33.           mountPath: /opt/flink/plugins/obs-fs-hadoop
  34.       initContainers:
  35.       - image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45
  36.         name: sidecar
  37.         command: ["sh"]
  38.         args: [
  39.                 "-c",
  40.                 "mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"
  41.         ]
  42.         volumeMounts:
  43.         - name: obs-plugin
  44.           mountPath: /opt/flink/plugins/obs-fs-hadoop
复制代码


4 提交flink作业

kubectl apply以下资源即可:
  1. apiVersion: flink.apache.org/v1beta1
  2. kind: FlinkSessionJob
  3. metadata:
  4.   name: basic-session-job-example
  5.   namespace: flink
  6. spec:
  7.   deploymentName: flink-session-cluster
  8.   job:
  9.     jarURI: obs://你的桶/StateMachineExample.jar    # jar包的位置,按实际情况填写
  10.     parallelism: 1
复制代码

可见flink作业是running状态,阐明jar包被flink operator从华为云对象存储OBS拉取下来并提交到flink集群中。
继续查看flink operator日志,可以看见obs相干的信息:



小结

本文介绍flink operator及其管理的flink集群是如何对接到华为云对象存储OBS,对接完成后,不仅可以将作业的jar包存储在对象存储,也可以将flink作业的状态、输入输出等存储在对象存储。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

刘俊凯

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表