ToB企服应用市场:ToB评测及商务社交产业平台

标题: kafka的安装和基本操作 [打印本页]

作者: 杀鸡焉用牛刀    时间: 2023-6-8 08:11
标题: kafka的安装和基本操作
基本概念

简介

Kafka 最初是由 LinkedIn 即领英公司基于 Scala 和 Java 语言开发的分布式消息发布-订阅系统,现已捐献给Apache 软件基金会。其具有高吞吐、低延迟的特性,许多大数据实时流式处理系统比如 Storm、Spark、Flink等都能很好地与之集成。
总的来讲,Kafka 通常具有 3 重角色:
一句话概括:Kafka 是一个分布式的基于发布/订阅模式的消息中间件,在业界主要应用于大数据实时流式计算领域,起解耦合和削峰填谷的作用。
特点

Kafka在各种应用场景中,起到的作用可以归纳为这么几个术语:削峰填谷,解耦!
在大数据流式计算领域中,kafka主要作为计算系统的前置缓存和输出结果缓存;
安装部署

kafka基于Zookeeper, 因此需要先安装Zookeeper, 详见https://www.cnblogs.com/paopaoT/p/17461562.html
  1. tar -zxvf kafka_2.11-2.2.2.tgz tar  -C /opt/apps/
复制代码
  1. # 进入配置文件目录
  2. cd kafka_2.12-2.3.1/config
  3. # 编辑配置文件
  4. vi server.properties
  5. # 为依次增长的:0、1、2、3、4,集群中唯一 id
  6. broker.id=0
  7. # 数据存储的⽬录
  8. log.dirs=/opt/data/kafka
  9. # 底层存储的数据(日志)留存时长(默认7天)
  10. log.retention.hours=168
  11. # 底层存储的数据(日志)留存量(默认1G)
  12. log.retention.bytes=1073741824
  13. # 指定zk集群地址
  14. zookeeper.connect=linux01:2181,linux02:2181,linux03:2181
复制代码
  1. vi /etc/profile
  2. export KAFKA_HOME=/opt/apps/kafka_2.11-2.2.2
  3. export PATH=$PATH:$KAFKA_HOME/bin
  4. source /etc/profile
复制代码
  1. for  i  in {2..3}
  2. do
  3. scp  -r  kafka_2.11-2.2.2  linux0$i:$PWD
  4. done
  5. # 安装包分发后,记得修改config/server.properties中的 配置参数: broker.id
  6. # 注意:还需要分发环境变量
复制代码
  1. bin/kafka-server-start.sh -daemon /opt/apps/kafka_2.11-2.2.2/config/server.properties
  2. # 停止集群
  3. bin/kafka-server-stop.sh
复制代码
  1. #!/bin/bash
  2. case $1 in
  3. "start"){
  4.         for i in linux01 linux02 linux03
  5.         do
  6.         echo ---------- kafka $i 启动 ------------
  7.                 ssh $i "source /etc/profile;  /opt/app/kafka2.4.1/bin/kafka-server-start.sh -daemon /opt/app/kafka2.4.1/config/server.properties"
  8.         done
  9. };;
  10. "stop"){
  11.         for i in linux01 linux02 linux03
  12.         do
  13.         echo ---------- kafka $i 停止 ------------
  14.                 ssh $i "source /etc/profile;  /opt/app/kafka2.4.1/bin/kafka-server-stop.sh "
  15.         done
  16. };;
  17. esac
复制代码
基本操作

概述

Kafka 中提供了许多命令行工具(位于$KAFKA_HOME/bin 目录下)用于管理集群的变更。
脚本作用kafka-console-producer.sh生产消息kafka-topics.sh管理主题kafka-server-stop.sh关闭Kafka服务kafka-server-start.sh启动Kafka服务kafka-configs.sh配置管理kafka-consumer-perf-test.sh测试消费性能kafka-producer-perf-test.sh测试生产性能kafka-dump-log.sh查看数据日志内容kafka-preferred-replica-election.sh优先副本的选举kafka-reassign-partitions.sh分区重分配管理操作:kafka-topics

创建topic

--bootstrap-server 和 --zookeeper一样的效果 ,新版本建议使用 --bootstrap-server
  1. kafka-topics.sh   --bootstrap-server  linux01:9092,linux02:9092,linux03:9092    --create --topic test01  --partitions 3  --replication-factor  3
  2. 参数解释:
  3. --replication-factor  副本数量
  4. --partitions 分区数量
  5. --topic topic名称
  6. # 本方式,副本的存储位置是系统自动决定的
  7. # 手动指定分配方案:分区数,副本数,存储位置
  8. kafka-topics.sh --create --topic tpc-1  --zookeeper linux01:2181 --replica-assignment 0:1:3,1:2:6
  9. 该topic,将有如下partition:(2个分区 3个副本)
  10. partition0 ,所在节点: broker0、broker1、broker3
  11. partition1 ,所在节点: broker1、broker2、broker6
  12. # 查看topic的状态信息
  13. kafka-topics.sh --describe --topic tpc-1 --zookeeper linux01:2181
  14. Topic: tpc-1    PartitionCount: 2       ReplicationFactor: 3    Configs:
  15.         Topic: tpc-1    Partition: 0    Leader: 0       Replicas: 0,1,3 Isr: 0,1
  16.         Topic: tpc-1    Partition: 1    Leader: 1       Replicas: 1,2,6 Isr: 1,2
复制代码
查看topic列表
  1. kafka-topics.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --list
  2. kafka-topics.sh --list --zookeeper linux01:2181
  3. __consumer_offsets
  4. tpc-1
复制代码
查看topic状态信息
  1. kafka-topics.sh --describe --zookeeper linux01:2181  --topic test
  2. Topic: test     PartitionCount: 3       ReplicationFactor: 3    Configs:
  3.         Topic: test     Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
  4.         Topic: test     Partition: 1    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
  5.         Topic: test     Partition: 2    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
  6. # topic的分区数量,以及每个分区的副本数量,以及每个副本所在的broker节点,以及每个分区的leader副本所在broker节点,以及每个分区的ISR副本列表;
  7. # ISR: in  sync  replica ,同步副同步本(当然也包含leader自身,replica.lag.time.max.ms =30000)
  8. # OSR:out  of  sync replicas 失去同步的副本(该副本上次请求leader同步数据距现在的时间间隔超出配置阈值)
  9. # ISR同步副本列表
  10. # ISR概念:(同步副本)。每个分区的leader会维护一个ISR列表,ISR列表里面就是follower副本的Borker编号,只有跟得上Leader的 follower副本才能加入到 ISR里面
  11. # 这个是通过replica.lag.time.max.ms =30000(默认值)参数配置的,只有ISR里的成员才有被选为 leader 的可能。
复制代码
踢出ISR和重新加入ISR的条件:
删除topic
  1. bin/kafka-topics.sh --zookeeper linux01:2181 --delete --topic test
  2. # 删除topic,server.properties中需要一个参数处于启用状态: delete.topic.enable = true(默认是true)
  3. # 使用 kafka-topics .sh 脚本删除主题的行为本质上只是在 ZooKeeper 中的 /admin/delete_topics 路径下建一个与待删除主题同名的节点,以标记该主题为待删除的状态。然后由 Kafka控制器异步完成。
复制代码
增加分区数
  1. kafka-topics.sh --zookeeper linux01:2181 --alter --topic paopao --partitions 3
  2. # Kafka只支持增加分区,不支持减少分区
  3. # 原因是:减少分区,代价太大(数据的转移,日志段拼接合并)
  4. # 如果真的需要实现此功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去;
复制代码
动态配置topic参数(不常用)
  1. # 通过管理命令,可以为已创建的topic增加、修改、删除topic level参数
  2. # 添加/修改  指定topic的配置参数:
  3. kafka-topics.sh  --zookeeper linux01:2181  --alter  --topic tpc2 --config compression.type=gzip
  4. # --config compression.type=gzip  修改或添加参数配置
  5. # --add-config compression.type=gzip  添加参数配置
  6. # --delete-config compression.type  删除配置参数
复制代码
生产者:kafka-console-producer
  1. kafka-console-producer.sh --broker-list linux01:9092 --topic test01
  2. >a
  3. >b
  4. >c
  5. >hello
  6. >hi
  7. >hadoop
  8. >hive
复制代码
顺序轮询(老版本)

顺序分配,消息是均匀的分配给每个 partition,即每个分区存储一次消息,轮询策略是 Kafka Producer 提供的默认策略,如果你不使用指定的轮询策略的话,Kafka 默认会使用顺序轮训策略的方式。
随机分配

实现随机分配的代码只需要两行,如下
  1. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  2. return ThreadLocalRandom.current().nextInt(partitions.size());
复制代码
消费者:kafka-console-consumer

消费者在消费的时候,需要指定要订阅的主题,还可以指定消费的起始偏移量
起始偏移量的指定策略有3中:
在命令行中,可以指定从什么地方开始消费
kafka的topic中的消息,是有序号的(序号叫消息偏移量),而且消息的偏移量是在各个partition中独立维护的,在各个分区内,都是从0开始递增编号!
  1. # 消费消息
  2. kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic test01 --from-beginning
  3. hive
  4. hello
  5. hadoop
  6. # 指定从最前面开始消费
  7. kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --from-beginning
  8. hadoop
  9. list
  10. hello
  11. kafka
  12. # 不指定他消费的位置的时候,就是从最新的地方开始消费
  13. kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao
  14. # 指定要消费的分区,和要消费的起始offset
  15. # 从指定的offset(需要指定偏移量和分区号)
  16. kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic paopao --offset 2 --partition 0
  17. yy
  18. abc
  19. 3333
  20. 2222
复制代码
消费组
如何让多个消费者组成一个组: 就是让这些消费者的groupId相同即可!
消费位移的记录

kafka的消费者,可以记录自己所消费到的消息偏移量,记录的这个偏移量就叫(消费位移);
记录这个消费到的位置,作用就在于消费者重启后可以接续上一次消费到位置来继续往后面消费;
消费位移,是组内共享的!!!消费位置记录在一个内置的topic中 ,默认是5s提交一次位移更新。
参数:auto.commit.interval.ms 默认是5s记录一次
  1. #  可以使用特定的工具类 解析内置记录偏移量的topic
  2. kafka-console-consumer.sh --bootstrap-server linux01:9092 --from-beginning --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
  3. # 通过指定formatter工具类,来对__consumer_offsets主题中的数据进行解析;
  4. [g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889851318, expireTimestamp=None)
  5. [g01,linux01,2]::OffsetAndMetadata(offset=17, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
  6. [g01,linux01,1]::OffsetAndMetadata(offset=13, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
  7. [g01,linux01,0]::OffsetAndMetadata(offset=14, leaderEpoch=Optional[0], metadata=, commitTimestamp=1659889856319, expireTimestamp=None)
  8. # 如果需要获取某个特定 consumer-group的消费偏移量信息,则需要计算该消费组的偏移量记录所在分区: Math.abs(groupID.hashCode()) % numPartitions(50)
  9. # 根据组id的hash取值%50 确定具体是将这个组具体每个分区消费到了哪里
  10. # __consumer_offsets的分区数为:50
复制代码
配置管理 kafka-config

kafka-configs.sh 脚本是专门用来进行动态参数配置操作的,这里的操作是运行状态修改原有的配置,如此可以达到动态变更的目的;一般情况下不会进行动态修改 。
动态配置的参数,会被存储在zookeeper上,因而是持久生效的
可用参数的查阅地址: https://kafka.apache.org/documentation/#configuration
  1. # kafka-configs.sh 脚本包含:变更alter、查看describe 这两种指令类型;
  2. # kafka-configs. sh 支持主题、 broker 、用户和客户端这4个类型的配置。
  3. # kafka-configs.sh 脚本使用 entity-type 参数来指定操作配置的类型,并且使 entity-name参数来指定操作配置的名称。
  4. # 比如查看topic的配置可以按如下方式执行:
  5. kafka-configs.sh --zookeeper linux01:2181  --describe  --entity-type topics  --entity-name paopao
  6. # 查看broker的动态配置可以按如下方式执行:
  7. kafka-configs.sh  --describe --entity-type brokers --entity-name 0 --zookeeper linux01:2181
复制代码
entity-type和entity-name的对应关系
  1. # 示例:添加topic级别参数
  2. kafka-configs.sh --zookeeper linux01:2181 --alter --entity-type topics --entity-name paopao --add-config cleanup.policy=compact,max.message.bytes=10000
  3. # 示例:添加broker参数
  4. kafka-configs.sh  --entity-type brokers --entity-name 0 --alter --add-config log.flush.interval.ms=1000 --bootstrap-server linux01:9092,linux02:9092,linux03:9092
复制代码
动态配置topic参数

通过管理命令,可以为已创建的topic增加、修改、删除topic level参数
添加/修改  指定topic的配置参数:
  1. kafka-topics.sh  --topic paopao --alter  --config compression.type=gzip --zookeeper linux01:2181
  2. # 如果利用 kafka-configs.sh 脚本来对topic、producer、consumer、broker等进行参数动态
  3. # 添加、修改配置参数
  4. kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --add-config compression.type=gzip
  5. # 删除配置参数
  6. kafka-configs.sh --zookeeper linux01:2181 --entity-type topics --entity-name paopao --alter --delete-config compression.type
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4