ToB企服应用市场:ToB评测及商务社交产业平台

标题: Kafka原理介绍+安装+基本操作(kafka on k8s) [打印本页]

作者: 魏晓东    时间: 2022-8-25 07:09
标题: Kafka原理介绍+安装+基本操作(kafka on k8s)
目录

一、Kafka概述

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
1)Kafka的特性

2)应用场景

二、Kafka架构简介

Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。

写入流程


三、Kakfa的设计思想

四、Zookeeper在Kafka中的作用

zookeeper 是 kafka 不可分割的一部分。接下来就来讲讲zookeeper在kafka中作用。
1)记录和维护broker状态

2)控制器(leader )选举

3)限额权限

4)记录 ISR(已同步的副本)

5)node 和 topic 注册

6)topic 配置

kafka 在 zookeeper 中的存储结构如下图所示:

五、Leader选举

1)控制器(Broker)选举Leader机制

所谓控制器就是一个Borker,在一个kafka集群中,有多个broker节点,但是它们之间需要选举出一个leader,其他的broker充当follower角色。集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器,其他broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。

2)分区副本选举Leader机制

在kafka的集群中,会存在着多个主题topic,在每一个topic中,又被划分为多个partition,为了防止数据不丢失,每一个partition又有多个副本,在整个集群中,总共有三种副本角色:

默认的,如果follower与leader之间超过10s内没有发送请求,或者说没有收到请求数据,此时该follower就会被认为“不同步副本”。而持续请求的副本就是“同步副本”,当leader发生故障时,只有“同步副本”才可以被选举为leader。其中的请求超时时间可以通过参数replica.lag.time.max.ms参数来配置。
我们希望每个分区的leader可以分布到不同的broker中,尽可能的达到负载均衡,所以会有一个优先leader,如果我们设置参数auto.leader.rebalance.enable为true,那么它会检查优先leader是否是真正的leader,如果不是,则会触发选举,让优先leader成为leader。
3)消费组(consumer group)选举机制

组协调器会为消费组(consumer group)内的所有消费者选举出一个leader,这个选举的算法也很简单,第一个加入consumer group的consumer即为leader,如果某一时刻leader消费者退出了消费组,那么会重新 随机 选举一个新的leader。
六、kubernetes(k8s) helm3安装zookeeper、kafka

k8s上安装kafka,可以使用helm,将kafka作为一个应用安装。当然这首先要你的k8s支持使用helm安装。helm的介绍和安装见:Kubernetes(k8s)包管理器Helm(Helm3)介绍&Helm3安装Harbor
1)前期准备

1、创建命名空间
  1. $ mkdir -p /opt/bigdata/kafka
  2. $ cd /opt/bigdata/kafka
  3. $ kubectl create namespace bigdata
复制代码
2、创建持久化存储SC(bigdata-nfs-storage)
  1. cat << EOF > bigdata-sc.yaml
  2. apiVersion: v1
  3. kind: ServiceAccount
  4. metadata:
  5.   name: nfs-client-provisioner
  6.   # replace with namespace where provisioner is deployed
  7.   namespace: bigdata        #根据实际环境设定namespace,下面类同
  8. ---
  9. kind: ClusterRole
  10. apiVersion: rbac.authorization.k8s.io/v1
  11. metadata:
  12.   name: nfs-client-provisioner-runner
  13.   namespace: bigdata
  14. rules:
  15.   - apiGroups: [""]
  16.     resources: ["persistentvolumes"]
  17.     verbs: ["get", "list", "watch", "create", "delete"]
  18.   - apiGroups: [""]
  19.     resources: ["persistentvolumeclaims"]
  20.     verbs: ["get", "list", "watch", "update"]
  21.   - apiGroups: ["storage.k8s.io"]
  22.     resources: ["storageclasses"]
  23.     verbs: ["get", "list", "watch"]
  24.   - apiGroups: [""]
  25.     resources: ["events"]
  26.     verbs: ["create", "update", "patch"]
  27. ---
  28. kind: ClusterRoleBinding
  29. apiVersion: rbac.authorization.k8s.io/v1
  30. metadata:
  31.   name: run-nfs-client-provisioner
  32. subjects:
  33.   - kind: ServiceAccount
  34.     name: nfs-client-provisioner
  35.     namespace: bigdata
  36.     # replace with namespace where provisioner is deployed
  37. roleRef:
  38.   kind: ClusterRole
  39.   name: nfs-client-provisioner-runner
  40.   apiGroup: rbac.authorization.k8s.io
  41. ---
  42. kind: Role
  43. apiVersion: rbac.authorization.k8s.io/v1
  44. metadata:
  45.   name: leader-locking-nfs-client-provisioner
  46.   namespace: bigdata
  47.     # replace with namespace where provisioner is deployed
  48. rules:
  49.   - apiGroups: [""]
  50.     resources: ["endpoints"]
  51.     verbs: ["get", "list", "watch", "create", "update", "patch"]
  52. ---
  53. kind: RoleBinding
  54. apiVersion: rbac.authorization.k8s.io/v1
  55. metadata:
  56.   name: leader-locking-nfs-client-provisioner
  57.   namespace: bigdata
  58. subjects:
  59.   - kind: ServiceAccount
  60.     name: nfs-client-provisioner
  61.     # replace with namespace where provisioner is deployed
  62.     namespace: bigdata
  63. roleRef:
  64.   kind: Role
  65.   name: leader-locking-nfs-client-provisioner
  66.   apiGroup: rbac.authorization.k8s.io
  67. ---
  68. kind: Deployment
  69. apiVersion: apps/v1
  70. metadata:
  71.   name: nfs-client-provisioner
  72.   namespace: bigdata
  73. spec:
  74.   replicas: 1
  75.   strategy:
  76.     type: Recreate
  77.   selector:
  78.     matchLabels:
  79.       app: nfs-client-provisioner
  80.   template:
  81.     metadata:
  82.       labels:
  83.         app: nfs-client-provisioner
  84.     spec:
  85.       serviceAccountName: nfs-client-provisioner
  86.       containers:
  87.         - name: nfs-client-provisioner
  88.           image: quay.io/external_storage/nfs-client-provisioner:latest
  89.           volumeMounts:
  90.             - name: nfs-client-root
  91.               mountPath: /persistentvolumes #容器内挂载点
  92.           env:
  93.             - name: PROVISIONER_NAME
  94.               value: fuseim.pri/ifs
  95.             - name: NFS_SERVER
  96.               value: 192.168.0.113
  97.             - name: NFS_PATH
  98.               value: /opt/nfsdata
  99.       volumes:
  100.         - name: nfs-client-root #宿主机挂载点
  101.           nfs:
  102.             server: 192.168.0.113
  103.             path: /opt/nfsdata
  104. ---
  105. apiVersion: storage.k8s.io/v1
  106. kind: StorageClass
  107. metadata:
  108.   name: bigdata-nfs-storage
  109.   namespace: bigdata
  110. provisioner: fuseim.pri/ifs  # or choose another name, must match deployment's env PROVISIONER_NAME'
  111. reclaimPolicy: Retain        #回收策略:Retain(保留)、 Recycle(回收)或者Delete(删除)
  112. volumeBindingMode: Immediate    #volumeBindingMode存储卷绑定策略
  113. allowVolumeExpansion: true    #pvc是否允许扩容
  114. EOF
复制代码
执行
  1. $ kubectl apply -f bigdata-sc.yaml
  2. $ kubectl get sc -n bigdata
  3. $ kubectl describe sc bigdata-nfs-storage -n bigdata
复制代码

3、helm添加bitnami仓库
  1. $ helm repo add bitnami https://charts.bitnami.com/bitnami
复制代码
2)部署zookeeper集群
  1. $ helm install zookeeper bitnami/zookeeper \
  2. --namespace bigdata \
  3. --set replicaCount=3 --set auth.enabled=false \
  4. --set allowAnonymousLogin=true \
  5. --set persistence.storageClass=bigdata-nfs-storage \
  6. --set persistence.size=1Gi
复制代码

查看,一定看到所有pod都是正常运行才ok
  1. $ kubectl get pod,pv,svc -n bigdata -o wide
复制代码

验证
内部连接测试
  1. $ export POD_NAME=$(kubectl get pods --namespace bigdata -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
  2. $ kubectl exec -it $POD_NAME -n bigdata -- zkCli.sh
复制代码

外部连接测试
  1. # 先删掉本地端口对应的进程,要不然就得换连接端口了
  2. $ netstat -tnlp|grep 127.0.0.1:2181|awk '{print int($NF)}'|xargs kill -9
  3. # 外部连接测试
  4. $ kubectl port-forward --namespace bigdata svc/zookeeper 2181:2181 &
  5. # 需要本机安装zk客户端
  6. $ zkCli.sh 127.0.0.1:21
复制代码
3)部署kafka集群

1、查看zookeeper集群状态
  1. $ helm status zookeeper -n bigdata
复制代码
  1. NAME: zookeeper
  2. LAST DEPLOYED: Sat Dec  4 13:38:16 2021
  3. NAMESPACE: bigdata
  4. STATUS: deployed
  5. REVISION: 1
  6. TEST SUITE: None
  7. NOTES:
  8. CHART NAME: zookeeper
  9. CHART VERSION: 7.4.13
  10. APP VERSION: 3.7.0
  11. ** Please be patient while the chart is being deployed **
  12. ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:
  13.     zookeeper.bigdata.svc.cluster.local
  14. To connect to your ZooKeeper server run the following commands:
  15.     export POD_NAME=$(kubectl get pods --namespace bigdata -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/                           component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
  16.     kubectl exec -it $POD_NAME -- zkCli.sh
  17. To connect to your ZooKeeper server from outside the cluster execute the following commands:
  18.     kubectl port-forward --namespace bigdata svc/zookeeper 2181:2181 &
  19.     zkCli.sh 127.0.0.1:2181
复制代码
ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:
  1. zookeeper.bigdata.svc.cluster.local
复制代码
安装
  1. $ helm install kafka bitnami/kafka \
  2. --namespace bigdata \
  3. --set zookeeper.enabled=false \
  4. --set replicaCount=3 \
  5. --set externalZookeeper.servers=zookeeper.bigdata.svc.cluster.local \
  6. --set persistence.storageClass=bigdata-nfs-storage
复制代码
  1. NAME: kafka
  2. LAST DEPLOYED: Sat Dec  4 15:37:33 2021
  3. NAMESPACE: bigdata
  4. STATUS: deployed
  5. REVISION: 1
  6. TEST SUITE: None
  7. NOTES:
  8. CHART NAME: kafka
  9. CHART VERSION: 14.4.3
  10. APP VERSION: 2.8.1
  11. ** Please be patient while the chart is being deployed **
  12. Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
  13.     kafka.bigdata.svc.cluster.local
  14. Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
  15.     kafka-0.kafka-headless.bigdata.svc.cluster.local:9092
  16.     kafka-1.kafka-headless.bigdata.svc.cluster.local:9092
  17.     kafka-2.kafka-headless.bigdata.svc.cluster.local:9092
  18. To create a pod that you can use as a Kafka client run the following commands:
  19.     kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.8.1-debian-10-r57 --namespace bigdata --command -- sleep infinity
  20.     kubectl exec --tty -i kafka-client --namespace bigdata -- bash
  21.     PRODUCER:
  22.         kafka-console-producer.sh \
  23.             --broker-list kafka-0.kafka-headless.bigdata.svc.cluster.local:9092,kafka-1.kafka-headless.bigdata.svc.cluster.local:9092,kafka-2.kafka-hea                           dless.bigdata.svc.cluster.local:9092 \
  24.             --topic test
  25.     CONSUMER:
  26.         kafka-console-consumer.sh \
  27.             --bootstrap-server kafka.bigdata.svc.cluster.local:9092 \
  28.             --topic test \
  29.             --from-beginning
复制代码
查看
  1. $ kubectl get pod,svc -n bigdata
复制代码

4)简单使用

测试,安装上面提示,先创建一个client
  1. $ kubectl run kafka-client --restart='Always' --image docker.io/bitnami/kafka:2.8.1-debian-10-r57 --namespace bigdata --command -- sleep infinity
复制代码
打开两个窗口(一个作为生产者:producer,一个作为消费者:consumer),但是两个窗口都得先登录客户端
  1. $ kubectl exec --tty -i kafka-client --namespace bigdata -- bash
复制代码
producer
  1. $ kafka-console-producer.sh \
  2. --broker-list kafka-0.kafka-headless.bigdata.svc.cluster.local:9092,kafka-1.kafka-headless.bigdata.svc.cluster.local:9092,kafka-2.kafka-headless.bigdata.svc.cluster.local:9092 \
  3. --topic test
复制代码
consumer
  1. $ kafka-console-consumer.sh \
  2. --bootstrap-server kafka.bigdata.svc.cluster.local:9092 \
  3. --topic test \
  4. --from-beginning
复制代码
在producer端输入,consumer会实时打印

1、创建Topic(一个副本一个分区)
  1. --create: 指定创建topic动作
  2. --topic:指定新建topic的名称
  3. --zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
  4. --config:指定当前topic上有效的参数值,参数列表参考文档为: Topic-level configuration
  5. --partitions:指定当前创建的kafka分区数量,默认为1个
  6. --replication-factor:指定每个分区的复制因子个数,默认1个
复制代码
  1. $ kafka-topics.sh --create --topic mytest --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --partitions 1 --replication-factor 1
  2. # 查看
  3. $ kafka-topics.sh --describe --zookeeper zookeeper.bigdata.svc.cluster.local:2181  --topic mytest
复制代码

2、删除Topic
  1. # 先查看topic列表
  2. $ kafka-topics.sh --list --zookeeper zookeeper.bigdata.svc.cluster.local:2181
  3. # 删除
  4. $ kafka-topics.sh --delete --topic mytest --zookeeper zookeeper.bigdata.svc.cluster.local:2181
  5. # 再查看,发现topic还在
  6. $ kafka-topics.sh --list --zookeeper zookeeper.bigdata.svc.cluster.local:2181
复制代码

其实上面没删除,只是标记了(只会删除zookeeper中的元数据,消息文件须手动删除)
Note: This will have no impact if delete.topic.enable is not set to true.## 默认情况下,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式:

3、修改Topic信息
kafka默认的只保存7天的数据,时间一到就删除数据,当遇到磁盘过小,存放的数据量过大,可以设置缩短这个时间。
  1. # 先创建一个topic
  2. $ kafka-topics.sh --create --topic test001 --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --partitions 1 --replication-factor 1
  3. # 修改,设置数据过期时间(-1表示不过期)
  4. $ kafka-topics.sh --zookeeper zookeeper.bigdata.svc.cluster.local:2181 -topic test001 --alter --config retention.ms=259200000
  5. # 修改多字段
  6. $ kafka-topics.sh --zookeeper zookeeper.bigdata.svc.cluster.local:2181 -topic test001 --alter --config max.message.bytes=128000 retention.ms=259200000
  7. $ kafka-topics.sh --describe --zookeeper zookeeper.bigdata.svc.cluster.local:2181  --topic test001
复制代码

4、增加topic分区数
  1. $ kafka-topics.sh --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --alter --topic test --partitions 10
  2. $  kafka-topics.sh --describe --zookeeper zookeeper.bigdata.svc.cluster.local:2181  --topic test
复制代码

5、查看Topic列表
  1. $ kafka-topics.sh --list --zookeeper zookeeper.bigdata.svc.cluster.local:2181
复制代码

6、列出所有主题中的所有用户组
  1. $ kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --list
复制代码
7、查询消费者组详情(数据积压情况)
  1. # 生产者$ kafka-console-producer.sh \
  2. --broker-list kafka-0.kafka-headless.bigdata.svc.cluster.local:9092,kafka-1.kafka-headless.bigdata.svc.cluster.local:9092,kafka-2.kafka-headless.bigdata.svc.cluster.local:9092 \
  3. --topic test# 消费者带group.id$ kafka-console-consumer.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --topic test --consumer-property group.id=mygroup# 查看消费组情况$ kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --describe --group mygroup
复制代码

消费积压情况分析
生产和消费的操作上面已经实验过了,这里就不再重复了,更多的操作,可以参考kafka官方文档

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4