kafka
kafka设置
环境准备
操纵系统:liunx系统
CPU尽量选择32核以上
根据数目产生的多少选择服务器台数
内存至少16G
摆设kafka
先摆设zookeeper,再摆设kafka
流程启动
概念介绍:
broker
使用kafka一样平常会启动多个kafka服务摆设到多台呆板,每台呆板上面的kafka服务就称为broker
controller
Kafka是分布式消息传输系统,所以存在多个Broker服务节点,但是它的软件架构接纳的是分布式系统中比较常见的主从(Master - Slave)架构,也就是说需要从多个Broker中找到一个用于管理整个Kafka集群的Master节点,这个节点,我们就称之为Controller。
其中Controller的选举是依赖zookeeper实现的
作用
1.broker管理:broker数目变化,数据变化
2.Topic管理:增删改Topic
3.partition管理:监听副本,isr等变化
4.数据服务
5.启动分区状态机和副本状态机
zookeeper
作用:管理集群,启动kafka之前需要先启用zookeeper,zookeeper协助kafka管理集群
主题创建
1.创建主题
2.通过命令提交指令,设置主题名称,分区,副本,副天职配策略等信息
3.指令会提交到kafka客户端
4.客户端会对指令进行校验
5.封装主题
6.让controller发起对该主题的创建请求
7.controller吸收到请求后转发给kafkaApis
8.然后kafkaApis根据下面的流程创建主题,由controller来管理主题以及分区副本等信息
生产数据
流程
1.通过producerRecord生产数据
2.interceptors拦截器:生产数据,同一经过拦截器,可以对一些没有的数据进行拦截
3.MetadataCache 元数据缓存:通过连接的boker获取集群信息,放入metadata进行缓存(这也是bootstrap.servers设置中只需要连接一个broker就可以知道其他broker的信息)
5.key/valueSerializer 序列化:序列化是将对象转换为字节数组(byte array)以便通过网络传输或存储的过程。在Kafka中,生产者将消息发送到主题(topic)时,消息的键和值都需要以字节形式进行传输,因此必须进行序列化。
6.partitioner分区器:数据需要发送到boker,分区存储在分区器中,这里分区器就是判断数据发送到哪个boker中
7.数据校验:校验数据是否巨细,缓存是否正常
8.缓冲区bufferPool:buffer.memory=32m缓存32m,数据不是来一条就发一条,数据发送是按批次,先进缓冲区,然后进入Deque队列(这里队列是按照topic划分),当达到batch.size=16k巨细之后才会发送
9.sender发送线程:缓冲区有数据后,sender会对缓冲区内里的数据进行拉取,然后以boker为单位进行发送
10.sender发送数据发送到selectoer,通过网络数据流发送到broker
详细如下
拦截器
1.界说一个拦截器类实现producerInterceptor接口
2.重写onSend方法(在成产发送数据后会调用该方法),传入参数和返回参数都是ProducerRecord,对key,value重新界说
分区器
producerRecord方法中有一个onSend方法发送数据,onSend中有个partition方法进行分区处理
1.如果指定了分区号,分区默认处理就是直接返回=传入的分区参数
题目:如果分了8个区,传入分区参数大于8的时候,就会卡住,因为在分区的时候没有校验,导致
2.没指定分区号,但是指定了分区器,则会走partitioner.partiton方法根据key/valkue等数据进行分区,这里需要自界说分区器界说一个类实现partitioner接口,重写方法,并在kafka参数中设置分区参数,实现的哪个类
3.如果没分区号,而且没有指定分区器,则会根据序列化key做散列运行并和当前topic的分区数目取余得到分区号
4.如果以上3步都没指定,则会返回一个未知分支,在数据网络的时候再算出分区
数据网络器
数据网络器内里有一个缓冲区来缓冲数据,缓冲区巨细为32m,数据进去数据库网络器之前还会进行数据校验,看发送的数据是否大于32m,kafka生产数据并不是生产一条就发送一条,而是再数据库网络器中进行分组,分到一批数据为batch.size=16k之后再发送,这样为了提高服从,减小资源斲丧,减少频繁的网络传输
题目:如果传入的数据凌驾的16k,那么再传输队列中会将这批数据放入队列,代表这是一批完备的数据,并发送,并不是一定要构成16k,如果2批数据,第一次的数据2k,第二次数据20k,则会将者2次的数据构成一批发送
这里还有个时间阈值,当批次数据没到16k第二批次对象整合数据凌驾期间阈值也会发送
数据网络器组装完数据后通过Sender线程进行发送,这里Sender线程又会对发送过来的数据进行组装,根据broker节点重新整合数据进行传输
异步和同步发送
发送数据有个回调函数,来判断数据是否发送成功
异步发送:会先将主线程发送的数据给发送到缓冲区,后续就可以再次进行发送,但是缓冲的数据还有由sender线程从缓冲区取代码进行发送存储并消费
同步发送:JDK1.5增加的JUC并发编程的Future接口的get方法实现。
- // TODO 发送数据
- producer.send(record, new Callback() {
- // TODO 回调对象
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- // TODO 当数据发送成功后,会回调此方法
- System.out.println("数据发送成功:" + recordMetadata.timestamp());
- }
- }).get();
复制代码 对比:异步发送服从高,但是不安全
同步发送服从低,安全
ACK应答机制
kafka生产数据时候为了提高数据可靠性,提供了应答机制,其中应答机制分为下面三类
ACK=-1(All):默认,数据生产之后会发送到broker,且全部副本都吸收,才会进行下一条的发送
ACK=1:当leader吸收就会响应,这里安全会比0好,但是比-1差,因为当leader出现题目,但是还没有同步到follower中,这时候就出现题目,数据不会丢失,但是会重复
ACK=0 :发送的缓冲区就可以直接响应,安全级别最低,数据不会重复,但是大概丢失
数据重复及乱序
由于网络或服务节点的故障,Kafka在传输数据时,大概会导致数据丢失,所以我们才会设置ACK应答机制,尽大概提高数据的可靠性,
但是设置了应答机制是保证数据不会丢失,但是其中的重试机制会导致数据的重复以及乱序
数据重复
当数据发送到broker之后,由于网络等影响导致broker没有返回ACK响应到producer,然后producer会进行重试,网络客户端会将数据返回给缓冲区,然后缓冲区重试重新发送给sender发送给网络客户端,发送给broker,但是发送的之前,broker重新建立连接了,内里已经有之前的数据,导致这时候吸收的数据重复
数据乱序
重试机制会导致乱序,当a,b,c多条数据发送给broker,其中最开始次序应该是a,b,c,但是数据c因为网络等印象没有响应,导致这条数据会进行重试,重新发送到broker这时候次序就改变成c,a,b
幂等性办理重复以及乱序
幂等性操纵要求:
1.ACKS=-1
2.开启重试
3.在途请求缓冲区的数目不能大于5 默认=5
4.开启幂等性
开启幂等性之后:这意味着Kafka会在发送请求时生成唯一的生产者ID(producer ID)和消息序列号(sequence number)用于确保每条消息的唯一性。
题目:无法实现多分区幂等性,以及跨会话幂等性
为什么能办理重复和乱序
传入a,b,c,d,e5条数据,因为开启幂等性之后会有唯一生产者ID(producer ID)和消息序列号(sequence number),这时候先到bufferpool缓冲,比及到了16k,或者时间就会向kafka发送数据,发送过程数据会先进入在途缓冲区,这里最大是5条数据,刚好是abcde5条,然后异步发送给到kafka磁盘中的log文件,并且kafka中的broker需要对数据进行应答ack的响应,如果其中c数据由于网络波动没有响应,这时候在途缓冲区中的c数据会保存并重新发送到broker,然后判断id和number来保证不重复和次序
开启变乱
变乱不能办理跨分区幂等性题目,变乱是针对一个分区来执行的,变乱只能办理跨会话幂等性题目
变乱传输语义
at most once 最多一次:不管是否能吸收到,数据最多只传一次。这样数据大概会丢失, Socket, ACK=0
at least once 最少一次:消息不会丢失,如果吸收不到,那么就继续发,所以会发送多次,直到收到为止,有大概出现数据重复 ACK=1
Exactly once 精准一次:消息只会一次,不会丢,也不会重复。 幂等+变乱+ACK=-1
存储数据
数据存储格式
数据已经过生产者Producer发送给Kafka集群,当Kafka吸收到数据后,会将数据写入本地文件中。
这里会产生多组文件,每组有3类文件
1.(.log)文件:存储数据key/value值,位置等信息
2.(.index)文件:存储逻辑偏移量以及对应的物理位置
3.(.timesIndex)文件:存储时间以及对应的位置
log
数据日记文件到达1G才会滚动生产新的文件
log.segment.bytes:kafka设置参数之一,表示每个日记段最大是多少字节,默认1G,达到这个参数设置的值之后才会参数新的log文件
log.segment.ms:默认值7天,表示日记段最大存在的时间,如果达到7天之后kafka会关闭当前段并生成新的日记段
通常由这两个参数控制日记段巨细
如果保证日记段删除:
log.retention.bytes:指定主题全部日记段统共可以占用最大字节数,当凌驾最大字节的时候会删除最旧的日记段
log.retention.ms:这个参数指定了日记段最大保存时间,凌驾这个时间kafka会删除该段
可以看到数据是一批一批的这里默认是batch.size=16K的巨细,不外本身也可以设置
log文件中的日记信息
数据项 含义
baseOffset 当前batch中第一条消息的位移
lastOffset 最新消息的位移相对于第一条消息的唯一增量
count 当前batch有的数据数目,kafka在进行消息遍历的时候,可以通过该字段快速的跳跃到下一个batch进行数据读取
partitionLeaderEpoch 记录了当前消息所在分区的 leader 的服务器版本(纪元),主要用于进行一些数据版本的校验和转换工作
crc 当前整个batch的数据crc校验码,主要用于对数据进行差错校验的
compresscode 数据压缩格式,主要有GZIP、LZ4、Snappy、zstd四种
baseSequence 当前批次中的基础序列号
lastSequence 当前批次中的最后一个序列号
producerId 生产者ID
producerEpoch 记录了当前消息所在分区的Producer的服务器版本(纪元)
isTransactional 是否开启变乱
magic 魔数(Kafka服务步伐协议版本号)
CreateTime(data) 数据创建的时间戳
isControl 控制类数据(produce的数据为false,变乱Marker为true)
compresscodec 压缩格式,默认无
isvalid 数据是否有用
offset 数据偏移量,从0开始
key 数据key
payload 数据value
sequence 当前批次中数据的序列号
CreateTime(header) 当前批次中最后一条数据的创建时间戳
index
索引文件中保存的就是逻辑偏移量和数据物理存储位置(偏移量)的对应关系
因为每个log文件最大存储1G文件,log文件是通过position来定位数据,偏移量是告诉我们保存的次序,但是我们可以通过偏移量盘算出position,因为每批数据的巨细我们是确定的。这里kafka提供了index索引文件
看图可以index文件中存储的是偏移量以及对应的物理位置
这时候可以根据偏移量拿到物理位置position
但是index文件存储的偏移量和positon并不是完备的,为了服从,并没将全部索引关系都保存下来,全部存下来的数据量也不小,kafka可以根据二分法来查询到具体position
timeIndex
时间索引文件,偶然候我们不想根据偏移量获取物理位置,这时候kafka还提供了时间索引文件,时间对应的偏移量,再通过偏移量获取postion
数据刷写
在Linux系统中,当我们把数据写入文件系统之后,实在数据在操纵系统的PageCache(页缓冲)内里,并没有刷到磁盘上生成log,index等文件。如果操纵系统挂了,数据就丢失了。
举例:
如果kafka生产者生产了一批数据,立马检察log文件,会发现内里没有数据,这是因为kafka直接写入磁盘的服从太低,leader副本的操纵是先将数据存操纵系统的PageCache(页缓冲)内里,再通过组件LogManager将缓存中的数据刷写入磁盘
这里的设置参数:
log.flush.interval.messages :达到消息数目时,会将数据flush到日记文件中。
log.flush.interval.ms :间隔多少时间(ms),执行一次强制的flush操纵。
flush.scheduler.interval.ms:全部日记刷新到磁盘的频率
官方不建议通过上述的三个参数来强制写盘,数据的可靠性应该通过replica来保证,而强制flush数据到磁盘会对团体性能产生影响。
kafka官方建议应当利用操纵系统背景来刷写缓存,而不是利用这三个参数强制写缓存
副本同步
kafka创建主题的时候会有多个分区,分区会有多个副本,差别的副本会选举成为leader和follower
Follower节点会启动数据同步线程ReplicaFetcherThread,从Leader副本节点同步数据。
线程运行后,会不停重复两个操纵:截断(truncate)和抓取(fetch)。
截断:为了保证分区副本的数据同等性,当分区存在Leader Epoch(leader纪元)值时,会将副本的本地日记截断到Leader Epoch对应的最新位移处.如果分区不存在对应的 Leader Epoch 记录,那么依然使用原来的高水位机制,将日记调整到高水位值处。
抓取:向Leader同步最新的数据。
这里引入3个概念
HW:高水位值(High Watermark)当多个副本数据差别等,取共有最新的数据作为水位线,消费者只能看见HW之前的数据
LEO:日记末了位移(Log End Offset),表示下一条待写入消息的offset,每个分区副本都会记录本身的LEO。对于Follower副本而言,它能读取到Leader副本 LEO 值以下的全部消息。
LSO:起始偏移量(Log Start Offset),每个分区副本都有起始偏移量,用于表示副本数据的起始偏移位置,初始值为0。
LSO一样平常环境下是无需更新的,但是如果数据过期,或用户手动删除数据时,Leader的Log Start Offset大概发生变化,Follower副本的日记需要和Leader保持严格的同等,因此,如果Leader的该值发生变化,Follower天然也要发生变化保持同等。
举例
这里ABC三个副本,Aleader副本值1,2,3,4 BFollower副本值1,2 C副本值1,2,3,HW是2 如果ALeader副本挂了,kafka会重新选举leader 这时候选举B副本作为leader,B中的数据是1,2 C中的数据是1,2,3,所以C会进行截断,C中的数据也成1,2
数据同等性
kafka设计目的是高吞吐,高并发,高性能,为了做到以上三点,它必须设计因素布式,多台呆板可以同时提供读写,并且需要为数据的存储做冗余备份
这里引入HW概念
HW:水位线,当一个分区有多个副本,必然会有leader和follower,其中follower会向leader同步数据,但是因为每个副本都在差别的broker上,follower从leader节点上同步是速率是不一样的,因为leader的数据大概是ABCD ,follower1同步的数据是ABC,follower2同步的数据是AB,HW相当于木桶原理,取最短的数据的最新值,或者说是共有有的在最新数据就是HW,消费者消费的时候也最多只能消费到HW之前的数据,这样就保证了消费同等性
ISR同步副本列表
ISR中包含了所以副本,第一个副本代表leader,当leader挂掉了,会重新选举leader,其中leader就是 ISR中的第一位,如果之前的副本规复之后,又会加入ISR中排到最后进行排队
消费数据
消费者消费数据方式:pull拉取&&push推送
kafka接纳消费者pull拉取kafka中的数据
因为对于差别消费者消费能力不一样,如果是kafka推送,则大概会出现某些消费者出现消费不赢,消息堆积等环境,而接纳pull方式者可以根据消费者自身的处理能力来消费数据
拉取数据 偏移量题目
kafka拉取数据默认是LEO拉取最新的数据
可以设置Auto_offset_reset_config=earliest 最早的偏移量
一旦消费者提交了偏移量,那么kafka会优先使用提交的偏移量进行消费。此时,auto.offset.reset参数是不起作用的。
题目点:为什么一个消费者知道本身的偏移量,重启之后为什么能继续从之前的地方进行消费
解答:主动提交时候kakfa会每隔一段时间会记录当前偏移量,手动提交就不用说了,如果设置的是earliest当第一次消费是消费最早的数据,第二次或者重启之后,消费的数据是上次提交的偏移量位置,之前的设置就失效了
主动提交(重复消费)
偏移量主动提交会出现消费数据重复题目
办理:1.修改主动保存偏移量的时间,但是时间再小也会出现重复消费题目,
2.使用手动提交
上面两种方案都只能说是减少数据重复题目,甚至手动提交会出现漏消费的环境
手动同步提交&&异步提交
主动提交改为手动提交偏移量,这个可以尽大概办理重复消费题目,但是大概出现漏消费题目
(当偏移量提交了,但是步伐中的数据还没处理过来就重启了,导致漏消费)
同步提交:阻塞当前线程,更可靠
异步提交:不会阻塞当前线程,吞吐量较高
为什么会重复消费
在Consumer消费的过程中,应用步伐被强制kill掉或者宕机
消费者组的重新平衡
缘故原由:默认poll处理时间是5分组,如果凌驾5分钟还没处理好数据,这消费者会触发Partition Balance机制,导致该分区的会重新分配,重新分配的有会拉取之前的数据
办理方法:
设置公道的 max.poll.interval.ms,使得消费者有足够时间处理消息,制止重平衡的发生。
在消费过程中使用幂等性机制,确保即使发生重复消费,结果也不会出现非常。
办理重复消费和漏消费环境
如何完全办理重复消费和漏消费题目
1.让拉取数据后的数据处理和偏移量提交进行一个原子绑定
方案一:开启kafka变乱和数据库变乱使之原子绑定
方案二:将消费接口进行幂等性处理(幂等消费 的焦点思想是:即使同一条消息被多次消费,消费的结果也应该是相同的。这样,重复消费 不会对系统状态造成影响,也不会导致数据差别等。)
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.errors.WakeupException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.List;
- import java.util.Properties;
- public class KafkaConsumerWithDurableOffset {
- public static void main(String[] args) {
- // Kafka 配置
- String bootstrapServers = "localhost:9092";
- String groupId = "test-group";
- String topic = "test-topic";
- // 配置消费者属性
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 禁用自动提交偏移量
- // 创建 Kafka 消费者实例
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
- // 订阅主题
- consumer.subscribe(List.of(topic));
- // 处理消费消息
- try {
- while (true) {
- var records = consumer.poll(java.time.Duration.ofMillis(100)); // 设置轮询时间
- // 处理每个消息
- for (var record : records) {
- try {
- // 模拟消息处理过程
- System.out.printf("Consumed record: Key = %s, Value = %s, Offset = %d\n",
- record.key(), record.value(), record.offset());
-
- // 在这里处理消息,例如将消息保存到数据库
- boolean isProcessed = processMessage(record);
-
- if (isProcessed) {
- // 只有消息成功处理后才提交偏移量
- consumer.commitSync(); // 手动提交偏移量
- } else {
- // 处理失败时,可以选择不提交偏移量,确保消息会被重试
- System.err.println("Error processing record, will retry");
- }
- } catch (Exception e) {
- // 处理消费失败的情况
- System.err.println("Error processing record: " + e.getMessage());
- // 消费失败时可以选择不提交偏移量,确保消息重试
- }
- }
- }
- } catch (WakeupException e) {
- // 处理中断消费时的异常
- System.out.println("Consumer interrupted");
- } finally {
- // 在退出时关闭消费者
- consumer.close();
- }
- }
- // 处理消息,保存到数据库等持久化存储
- private static boolean processMessage(org.apache.kafka.clients.consumer.ConsumerRecord<String, String> record) {
- // 这里假设我们把消息保存到数据库
- try {
- // 模拟数据库操作,保存消息
- // e.g., insertRecordIntoDatabase(record);
- // 如果保存成功返回 true
- return true;
- } catch (Exception e) {
- // 如果保存失败,返回 false
- return false;
- }
- }
- }
复制代码 变乱隔离级别
ISOLATION_LEVEL_CONFIG
默认read_uncmmitted 未提交读,读取已经提交变乱成功和未提交变乱成功的数据
默认read_committed 已提交读,读取已经提交变乱成功的数据(默认)
消费者组
一个主题有8个分区,可以有大于分区数目的消费者,当一个消费者组的时候,一个分区只能有一个消费者消费,多余的消费者会当备份,无法进行消费
当其中一个消费者挂掉,备份消费者会顶替之前的消费者进行消费,这里会继续之前的偏移量进行消费
如何找到偏移量的?
kafka在内部创建了一个consumer_offsets主题来保存消息的偏移量
这里偏移量的获取是kafka底层直接处理的
消费者分配策略
Kafka消费者默认的分区分配就是RangeAssignor,CooperativeStickyAssignor
RoundRobinAssignor(轮询分配策略)
每个消费者组中的消费者都会含有一个主动生产的UUID作为memberid。
轮询策略中会将每个消费者按照memberid进行排序,全部member消费的主题分区根据主题名称进行排序。
缺点:不是很平衡
RangeAssignor(范围分配策略)
缺点:Range分配策略针对单个Topic的环境下显得比较平衡,但是假如Topic多的话, member排序靠前的大概会比member排序靠后的负载多许多。是不是也不够抱负。
StickyAssignor(粘性分区)
在第一次分配后,每个构成员都保存分配给本身的分区信息。如果有消费者加入或退出,那么在进行分区再分配时(一样平常环境下,消费者退出45s后,才会进行再分配,因为需要考虑大概又规复的环境),尽大概保证消费者原有的分区稳定,重新对加入或退出消费者的分区进行分配。
粘性分区分配策略分配的会更加均匀和高效一些。
总结:尽大概最少改动分区
CooperativeStickyAssignor
优化事后的粘性分区
消费者的leader选取
1.当第一个消费者加入组进行消费的时候这个消费者就是leader
2.当这个组的第二个消费者加入组之后,需要重新选举leader
3.第三个消费者加入组之后,前面两个消费者都要重新加入组和第三个消费者来竞争leader
leader的作用:leader分配消费者对应消费哪个分区
集群脑裂题目
zookeeper管理kafka集群,第一个加入zookeeper的broker的是controller,来管理其他broker
题目:当A(broker)controller挂掉之后(网络不稳定之类),其他broker(B)会选出一个当controller,这时候A规复之后,谁是集群的管理者?
答:集群中有个epoch(纪元),也就是第几任的意思,每当更新controller之后,epoch就会+1,管理者取最新的epoch,也就是B会继续当controller
ISR
AR:所以副本的聚集
OSR:没有同步的副本聚集
ISR:正在同步的副本聚集
当副本和leader副本差距过大会移除ISR,加入OSR,当OSR中的副本跟上leader数据之后才会加入ISR,这样是确保leader挂了之后从ISR中选取的副本是同步的
留意:分区数可以大于节点数,但副本数不能大于节点数。因为副本需要分布在差别的节点上,才能达到备份的目的。
数据有序性
数据有序主要是三个方面
生产有序:开启幂等性让数据保持有序,kafka底层在在途缓冲区进行数据比对,一旦数据次序差别等则会让生产者重新发送,如果数据发送到多个分区,是无法保证次序的
存储有序:存储有序指的是kafka的服务端获取数据后会将数据次序写入日记文件
消费有序:存储数据时会给数据增加一个访问的偏移量值,那消费者只能按照偏移量的方式次序
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |