全网最具体的Kafka应用教程【建议收藏】

一给  金牌会员 | 2025-1-13 10:15:33 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 856|帖子 856|积分 2568

Kafka

kafka是什么?kafka仅仅是属于消息 中心件吗?

kafka在设计之初的时候 开发人员们在除了消息中心件以外,还想吧kafka设计为一个可以或许存储数据的体系,有点像常见的非关系型数据库,比如说NoSql等。除此之外 还希望kafka能支持连续变革,不绝增长的数据流, 可以发布 和订阅数据流,还可以对于这些数据进行生存
也就是说kafka的本质 是一个数据存储平台,流平台 , 只是他在做消息发布,消息消费的时候我们可以把他当做消息中心件来用。
而且kafka在设计之初就是采用分布式架构设计的, 基于集群的方式工作,且可以自由伸缩,所以 kafka构建集群非常简朴
根本概念:



  • Broker : 和AMQP里协议的概念一样, 就是消息中心件所在的服务器
  • Topic(主题) : 每条发布到Kafka集群的消息都有一个种别,这个种别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息固然生存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  • Partition(分区) : Partition是物理上的概念,体如今磁盘上面,每个Topic包含一个或多个Partition.
  • Producer : 负责发布消息到Kafka broker
  • Consumer : 消息消费者,向Kafka broker读取消息的客户端。
  • Consumer Group(消费者群组) : 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
  • offset 偏移量: 是kafka用来确定消息是否被消费过的标识,在kafka内部体现就是一个递增的数字
kafka消息发送的时候 ,考虑到性能 可以采用打包方式发送, 也就是说 传统的消息是一条一条发送, 如今可以先把需要发送的消息缓存在客户端, 等到达肯定命值时, 再一起打包发送, 而且还可以对发送的数据进行压缩处理,淘汰在数据传输时的开销
kafka优缺点

长处:


  • ​ 基于磁盘的数据存储 ​
  • 高伸缩性 ​
  • 高性能
  • 应用场景 : 网络指标和日志 ​ 提交日志 流处理
缺点:


  • ​ 运维难度大 ​
  • 偶尔有数据杂乱的情况 ​
  • 对zookeeper强依靠 ​
  • 多副本模式下对带宽有肯定要求
kafka安装&启动

kafka安装的话,直接 从官网下载压缩包下来解压就可以了
注意的是, 启动kafka要先启动zookeeper kafka默认自带了zookeeper 可以启动他自带的 也可以本身别的利用
启动kafka需要实行 kafka-server-start.bat 文件 然后 需要传入一个路径参数 就是你server.config文件的地点 一般情况下传入../../config/server.properties 即可
刚刚提到的zookeeper kafka默认的zookeeper 启动的话启动zookeeper-server-start.bat文件即可 同样 要传入路径参数:../../config/zookeeper.properties
server 参数表明:

log.dirs: 日志文件存储地点, 可以设置多个
num.recovery.threads.per.data.dir:用来读取日志文件的线程数目,对应每一个log.dirs 若此参数为2 log.dirs 为2个目录 那么就会有4个线程来读取
auto.create.topics.enable:是否主动创建tiopic
num.partitions: 创建topic的时候主动创建多少个分区 (可以在创建topic的时候手动指定)
log.retention.hours: 日志文件保留时间 超时即删除
log.retention.bytes: 日志文件最大巨细
log.segment.bytes: 当日志文件达到肯定巨细时,开辟新的文件来存储(分片存储)
log.segment.ms: 同上 只是当达到肯定时间时 开辟新的文件
message.max.bytes: broker能吸收的最大消息巨细(单条) 默认1M
kafka根本管理操作命令:

##列出所有主题 kafka-topics.bat --zookeeper localhost:2181/kafka --list
##列出所有主题的具体信息 kafka-topics.bat --zookeeper localhost:2181/kafka --describe
##创建主题 主题名 my-topic,1副本,8分区 kafka-topics.bat --zookeeper localhost:2181/kafka --create --replication-factor 1 --partitions 8 --topic my-topic
##增加分区,注意:分区无法被删除 kafka-topics.bat --zookeeper localhost:2181/kafka --alter --topic my-topic --partitions 16
##删除主题 kafka-topics.bat --zookeeper localhost:2181/kafka --delete --topic my-topic
##列出消费者群组(仅Linux) kafka-topics.sh --new-consumer --bootstrap-server localhost:9092/kafka --list
##列出消费者群组具体信息(仅Linux) kafka-topics.sh --new-consumer --bootstrap-server localhost:9092/kafka --describe --group 群组名
kafka java客户端实战

引入maven依靠:
  1. <dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     <artifactId>kafka-clients</artifactId>
  4.     <version>0.11.0.0</version>
  5. </dependency>
复制代码
注意 我这里已经创建了一个叫 test-topic 的主题 如果你们没创建先创建后再实行代码
生产者:
  1. public class TestProducter {
  2.     public static void main(String[] args) throws  Exception{
  3.         Properties properties = new Properties();
  4.         //指定kafka服务器地址 如果是集群可以指定多个  但是就算只指定一个他也会去集群环境下寻找其他的节点地 址
  5.         properties.setProperty("bootstrap.servers","127.0.0.1:9092");
  6.         //key序列化器
  7.         properties.setProperty("key.serializer", StringSerializer.class.getName());
  8.         //value序列化器
  9.         properties.setProperty("value.serializer",StringSerializer.class.getName());
  10.         KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
  11.         ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<String, String>("test-topic",1,"testKey","hello");
  12.         Future<RecordMetadata> send = kafkaProducer.send(stringStringProducerRecord);
  13.         RecordMetadata recordMetadata = send.get();
  14.         System.out.println(recordMetadata);
  15.     }
  16. }
复制代码
消费者:
  1. public class TestCousmer {
  2.     public static void main(String[] args) {
  3.         Properties properties = new Properties();
  4.         properties.setProperty("bootstrap.servers","127.0.0.1:9092");
  5.         properties.setProperty("key.deserializer", StringDeserializer.class.getName());
  6.         properties.setProperty("value.deserializer",StringDeserializer.class.getName());
  7.         properties.setProperty("group.id","1111");
  8.         KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
  9.         consumer.subscribe(Collections.singletonList("test-topic"));
  10.         while (true){
  11.             ConsumerRecords<String, String> poll = consumer.poll(500);
  12.             for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
  13.                 System.out.println(stringStringConsumerRecord);
  14.             }
  15.         }
  16.     }
  17. }
复制代码
kafka生产者参数详解

acks:

​ 至少要多少个分区副本吸收到了消息返回确认消息 一般是 0:只要消息发送出去了就确认(不管是否失败) 1:只要 有一个broker吸收到了消息 就返回 all: 所有集群副本都吸收到了消息确认 当然 2 3 4 5 这种数字都可以, 就是具体多少台呆板吸收到了消息返回, 但是一般这种情况很少用到
buffer.memory:

​ 生产者缓存在本地的消息巨细 : 如果生产者在生产消息的速度过快 快过了往 broker发送消息的速度 那么就会出现buffer.memory不敷的题目 默认值为32M 注意 单元是byte 大概3355000左右
max.block.ms:

​ 生产者获取kafka元数据(集群数据,服务器数据等) 等待时间 : 当因网络原因导致客户端与服务器通讯时等待的时间高出此值时 会抛出一个TimeOutExctption 默认值为 60000ms
retries:

​ 设置生产者生产消息失败后重试的次数 默认值 3次
retry.backoff.ms:

​ 设置生产者每次重试的隔断 默认 100ms
batch.size:

​ 生产者批次发送消息的巨细 默认16k 注意单元还是byte
linger.ms:

​ 生产者生产消息后等待多少毫秒发送到broker 与batch.size 谁先到达就根据谁 默认值为0
compression.type:

​ kafka在压缩数据时利用的压缩算法 可选参数有:none、gzip、snappy none即不压缩 gzip,和snappy压缩算法之间弃取的话 gzip压缩率比较高 体系cpu占用比较大 但是带来的好处是 网络带宽占用少, snappy压缩比没有gzip高 cpu占用率不是很高 性能也还行, 如果网络带宽比较告急的话 可以选择gzip 一般推荐snappy
client.id:

​ 一个标识, 可以用来标识消息来自哪, 不影响kafka消息生产
max.in.flight.requests.per.connection:

​ 指定kafka一次发送请求在得到服务器回应之前,可发送的消息数目
偏移量与偏移量提交

​ 偏移量是kafka特别重要的一个概念特别是在消费者端, 我们之前也有简朴提到过偏移量是拿来干嘛的.
偏移量是一个自增长的ID 用来标识当前分区的哪些消息被消费过了, 这个ID会生存在kafka的broker当中 而且 消费者本地也会存储一份 因为每次消费每一条消息都要更新一下偏移量的话 不免会影响整个broker的吞吐量 所以一般 这个偏移量在每次发生改动时 先由消费者本地改动, 默认情况下 消费者每五秒钟会提交一次改动的偏移量, 这样做固然说吞吐量上来了, 但是大概会出现重复消费的题目: 因为大概在下一次提交偏移量之前 消费者本地消费了一些消息,然后发生了分区再均衡(分区再均衡在下面有讲) 那么就会出现一个题目 假设上次提交的偏移量是 2000 在下一次提交之前 其实消费者又消费了500条数据 也就是说当前的偏移量应该是2500 但是这个2500只在消费者本地, 也就是说 假设其他消费者去消费这个分区的时候拿到的偏移量是2000 那么又会从2000开始消费消息 那么 2000到2500之间的消息又会被消费一遍,这就是重复消费的题目.
kafka对于这种题目也提供了解决方案:手动提交
你可以关闭默认的主动提交(enable.auto.commit= false) 然后利用kafka提供的API来进行偏移量提交: 卡夫卡提供了两种方式提交你的偏移量 :同步和异步
  1. //同步提交偏移量
  2. kafkaConsumer.commitSync();
  3. //异步提交偏移量
  4. kafkaConsumer.commitAsync();
复制代码
他们之间的区别在于 同步提交偏移量会等待服务器应答 而且遇到错误会尝试重试,但是会肯定程度上影响性能不过能确保偏移量到底提交成功与否
而异步提交的对于性能肯定是有提示的 但是毛病也就像我们刚刚所提到 遇到错误没办法重试 因为大概在收到你这个效果的时候又提交过偏移量了 如果这时候重试 又会导致消息重复的题目了..
其实 我们可以采用同步+异步的方式来包管提交的正确性以及服务器的性能
因为 异步提交的话 如果出现题目但是不是致命题目的话 大概下一次提交就不会出现这个题目了, 所以 有些异常是不需要解决的(大概单纯的就是网络抽风了呢? ) 所以 我们平常可以采用异步提交的方式 等到消费者停止了(遇到了致命题目,或是强制停止消费者) 的时候再利用同步提交(因为这次如果失败了就没有下次了... 所以要让他重试) 。
具体代码:
  1. try {
  2.     while (true) {
  3.         ConsumerRecords<String, String> poll = kafkaConsumer.poll(500);
  4.         for (ConsumerRecord<String, String> context : poll) {
  5.             System.out.println("消息所在分区:" + context.partition() + "-消息的偏移量:" + context.offset()
  6.                     + "key:" + context.key() + "value:" + context.value());
  7.         }
  8.         //正常情况异步提交
  9.          kafkaConsumer.commitAsync();
  10.     }
  11. } catch (Exception e) {
  12.     e.printStackTrace();
  13. } finally {
  14.     try {
  15.         //当程序中断时同步提交
  16.         kafkaConsumer.commitSync();
  17.     } catch (Exception e) {
  18.         e.printStackTrace();
  19.     } finally {
  20.         //关闭当前消费者  具体在下面有讲
  21.         kafkaConsumer.close();
  22.     }
  23. }
复制代码
值得一提的是 在手动提交时kafka提供了你可以传入具体的偏移量来完成提交 也就是指定偏移量提交,但是非常不建议手动指定 因为如果指定的偏移量 小于 分区所存储的偏移量巨细的话 那么会导致消息重复消费, 如果指定的偏移量大于分区所存储的偏移量的话,那么会导致消息丢失.
代码:
  1. Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
  2. //我这里就指定了test-topic这个主题下的分区1  OffsetAndMetadata:第一个参数为你要提交的偏移量 第二个参数可以选择性的传入业务ID 可以拿来确定这次提交  这里我直接提交偏移量为0 那么会导致下个消费者或者说分区再均衡之后再来读取这个分区的数据会从第一条开始读取
  3. offset.put(new TopicPartition("test-topic", 1), new OffsetAndMetadata(0, "1"));
  4. //指定偏移量提交 参数为map集合  key为指定的主题下的分区,value 为你要提交的偏移量
  5. kafkaConsumer.commitSync(offset);
复制代码
Rebalance 分区再均衡

这也是kafka里面非常重要的一个概念
起首 Rebalance 是一个操作 在以下情况下会触发Rebalance 操作:

  • 组成员发生变更(新consumer到场组、已有consumer主动离开组或已有consumer崩溃了)
  • 订阅主题数发生变更,如果你利用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
  • 订阅主题的分区数发生变更
当触发Rebalance kafka重新分配分区所有权
何为分区所有权? 我们之前有提到过, 消费者有一个消费者组的概念, 而且一个消费者组在消费一个主题时有以下规则 一个消费者可以消费多个分区 但是一个分区只能被一个消费者消费 如果 我有分区 0 1 2 如今有消费者 A,B 那么 kafka大概会让消费者A 消费 0,1 这2个分区 那么 这时候 我们就会说 消费者A 拥有 分区 0,1的所有权。
当触发 Rebalance 的时候 kafka会重新分配这个所有权 还是基于刚刚的比方 消费者A 拥有 0 和1 的所有权 消费者B 会有2的所有权 当消费者B离开kafka的时候 这时候 kafka会重新分配一下所有权 此时 整个消费者组只有一个A 那么 0 1 2 三个分区的所有权都会属于A 同理 如果这时候有消费者C进入这个消费者组 那么 这时候kafka会确保每一个消费者都能消费一个分区.
当触发Rebalance时 由于kafka正在分配所有权 会导致消费者不能消费, 而且 还会引发一个重复消费的题目, 当
消费者还没来得及提交偏移量时 分区所有权遭到了重新分配 那么这时候就会导致一个消息被多个消费者重复消费
那么 解决方案就是 在消费者订阅时, 添加一个再均衡监听器, 也就是当kafka在做Rebalance 操作前后 均会调用再均衡监听器 那么这时候 我们可以在kafka Rebalance之条件交我们消费者最后处理的消息来解决这个题目。
Close():

当我们不需要某个消费者继续消费kafka当中的数据时, 我们可以选择调用Close方法来关闭它,在关闭之前 close方法会发送一个通知告诉kafka我这个消费者要退出了, 那么 kafka就会准备Rebalance 而且如果是采用的主动提交偏移量 消费者自身也会在关闭本身之条件交最后所消费的偏移量 。
当然 即使没有调用close方法 而是直接强制停止了消费者的进程 kafka也会根据我们后面会说到的体系参数捕获到消费者退出了。

独立消费者:

kafka支持这样的需求: 大概你的消费者不想订阅某个主题 也不想到场什么消费组 只想订阅某个(多个)主题下的某个(多个)分区。
那么可以采用分配的方式, 而不是订阅 , 我们之前讲的都是基于消费组订阅某个主题来完成消息的消费, 那么你订阅的主题有哪些分区的消息是属于你的 这个是kafka来分配的 而不是你本身决定的 那么我们可以换为本身分配的方式来完成消息的消费:
  1. List<TopicPartition> list = new ArrayList<>();
  2. //new出一个分区对象 声明这个分区是哪个topic下面的哪个分区
  3. list.add(new TopicPartition("test-topic",0));
  4. //分配这个消费者所需要消费的分区, 传入一个分区对象集合
  5. kafkaConsumer.assign(list);
复制代码
消费者参数:

fetch.min.bytes:

​ 该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的巨细,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活泼的时候(或者一天里的低谷时段)就不需要来往返回地处理消息。如果没有许多可用数据,但消费者的 CPU 利用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数目比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。 默认值为1 byte
fetch.max.wait.ms

​ 我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 feth.max.wait.ms 则用于指定 broker 的等待时间,默认是如果没有足够的数据流入Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的耽误。如果 fetch.max.wait.ms 被设为 100ms,而且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返回 1MB 数据,要么在 100ms 后返回所有可用的数据,就看哪个条件先得到满足。 默认值为500ms
max.partition.fetch.bytes

​ 该属性指定了服务器从每个分区里返回给消费者的最大字节数。默认值是 1MB
session.timeout.ms 和heartbeat.interval.ms

session.timeout.ms :

​ 消费者多久没有发送心跳给服务器服务器则以为消费者殒命/退出消费者组 默认值:10000ms
heartbeat.interval.ms :

​ 消费者往kafka服务器发送心跳的隔断 一般设置为session.timeout.ms的三分之一 默认值:3000ms
auto.offset.reset:

​ 当消费者本地没有对应分区的offset时 会根据此参数做不同的处理 默认值为:latest
earliest

​ 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,重新开始消费
latest

​ 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none

​ topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
enable.auto.commit

​ 该属性指定了消费者是否主动提交偏移量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由本身控制何时提交偏移量。如果把它设为 true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。
partition.assignment.strategy

​ PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略。


  • ​ Range:该策略会把主题的多少个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时订阅了主题 T1 和主题 T2,而且每个主题有 3 个分区。那么消费者 C1 有大概分配到这两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要利用了 Range 策略,而且分区数目无法被消费者数目整除,就会出现这种情况。
  • ​ RoundRobin:该策略把主题的所有分区逐个分配给消费者。如果利用 RoundRobin 策略来给消费者 C1 和消费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所有消费者分配相同数目的分区(或最多就差一个分区)。
max.poll.records

  1. 单次调用 poll() 方法最多能够返回的记录条数 ,默认值 500
复制代码
receive.buffer.bytes 和 send.buffer.bytes

​ receive.buffer.bytes 默认值 64k 单元 bytes
​ send.buffer.bytes 默认值 128k 单元 bytes
​ 这两个参数分别指定了 TCP socket 吸收和发送数据包的缓冲区巨细。如果它们被设为 -1
利用java来操作kafka管理命令

​ 起首 得引入一个依靠:

  1. <dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     <artifactId>kafka_2.10</artifactId>
  4.     <version>0.10.2.1</version>
  5. </dependency>
复制代码
我们之前所引入的依靠是kafka客户端的依靠 这个是别的的依靠 不是一回事
创建topic

  1. public static void createTopic(){
  2.     ZkUtils zkUtils = ZkUtils.apply("localhost:2181/kafka", 30000, 30000, JaasUtils.isZkSecurityEnabled());
  3.     System.out.println(JaasUtils.isZkSecurityEnabled());
  4.     AdminUtils.createTopic(zkUtils, "t1", 1, 1, new Properties(), AdminUtils.createTopic$default$6());
  5.     zkUtils.close();
  6. }
复制代码
删除topic

  1. public static  void deleteTopic(){
  2.     ZkUtils zkUtils = ZkUtils.apply("localhost:2181/kafka", 30000, 30000, JaasUtils.isZkSecurityEnabled());
  3.     AdminUtils.deleteTopic(zkUtils, "t1");
  4.     zkUtils.close();
  5. }
复制代码
列出所有topic

  1. public  static void listTopic(){
  2.     ZkUtils zkUtils = ZkUtils.apply("localhost:2181/kafka", 30000, 30000, JaasUtils.isZkSecurityEnabled());
  3.     List<String> list = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
  4.     for (String s : list) {
  5.         System.out.println(s);
  6.     }
  7.     zkUtils.close();
  8. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

一给

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表