日志分段切分条件
日志分段文件切分包含以下4个条件,满足其一即可:
- 当前日志分段文件的大小超过了broker端参数 log.segment.bytes 配置的值。log.segment.bytes参数的默认值为 1073741824,即1GB
- 当前日志分段中消息的最小时间戳与当前系统的时间戳的差值大于log.roll.ms或log.roll.hours参数配置的值。如果同时配置了log.roll.ms和log.roll.hours参数,那么log.roll.ms的优先级高,默认情况下,只配置了log.roll.hours参数,其值为168,即7天。
- 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数log.index.size.max.bytes配置的值。log.index.size .max.bytes的默认值为10485760,即10MB
- 追加的消息的偏移量与当前日志分段的起始偏移量之间的差值大于Integer.MAX_VALUE, 即要追加的消息的偏移量不能转变为相对偏移量(offset - baseOffset > Integer.MAX_VALUE)。
什么是Controller
Controller作为Kafka集群中的核心组件,它的主要作用是在Apache ZooKeeper的帮助下管理和协调整个Kafka集群。
Controller与Zookeeper进行交互,获取与更新集群中的元数据信息。其他broker并不直接与zookeeper进行通信,而是与Controller进行通信并同步Controller中的元数据信息。
Kafka集群中每个节点都可以充当Controller节点,但集群中同时只能有一个Controller节点。
Controller简单来说,就是kafka集群的状态管理者
controller竞选机制:简单说,先来先上!
Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责维护整个集群中所有分区和副本的状态及分区leader的选举。
当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:- {"version":1,"brokerid":0,"timestamp":"1529210278988"}
复制代码 其中version在目前版本中固定为1,brokerid表示成为控制器的broker的id编号,timestamp表示竞选成为控制器时的时间戳。
在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取zookeeper上的/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。

controller的职责
- 对Zookeeper中的/admin/reassign_partitions节点注册PartitionReassignmentListener,用来处理分区重分配的动作。
- 对Zookeeper中的/isr_change_notification节点注册IsrChangeNotificetionListener,用来处理ISR集合变更的动作。
- 对Zookeeper中的/admin/preferred-replica-election节点添加PreferredReplicaElectionListener,用来处理优先副本选举。
复制代码- 对Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;
- 对Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作
复制代码- 对Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化
复制代码- 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。
- 对各topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。并将最新信息同步给其他所有broker。
复制代码
- 启动并管理分区状态机和副本状态机。
- 如果参数auto.leader.rebalance.enable设置为true,则还会开启一个名为“auto-leader-rebalance-task”的定时任务来负责维护分区的leader副本的均衡。
分区的负载分布
客户端请求创建一个topic时,每一个分区副本在broker上的分配,是由集群controller来决定;
结论:里面会创建出来两个随机数
第一个随机数确定0号分区leader的位置,往后1号分区2号分区的leader依次往后顺延1
第二个随机数确定每个分区的第一个副本的位置 在leader所在机器上往后顺延(随机数+1)台机器,该台机器就是第一个副本的位置,剩余副本依次往后顺延1- // 举例:
- // broker_id = 0~19 一共20台机器
- // 分区数20,副本数10
- // 第一个随机数:19
- // 第二个随机数:0
- (0,ArrayBuffer(19, 0, 1, 2, 3, 4, 5, 6, 7, 8))
- (1,ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
- (2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
- (3,ArrayBuffer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
- (4,ArrayBuffer(3, 4, 5, 6, 7, 8, 9, 10, 11, 12))
- (5,ArrayBuffer(4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
- (6,ArrayBuffer(5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
- (7,ArrayBuffer(6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
- (8,ArrayBuffer(7, 8, 9, 10, 11, 12, 13, 14, 15, 16))
- (9,ArrayBuffer(8, 9, 10, 11, 12, 13, 14, 15, 16, 17))
- (10,ArrayBuffer(9, 10, 11, 12, 13, 14, 15, 16, 17, 18))
- (11,ArrayBuffer(10, 11, 12, 13, 14, 15, 16, 17, 18, 19))
- (12,ArrayBuffer(11, 12, 13, 14, 15, 16, 17, 18, 19, 0))
- (13,ArrayBuffer(12, 13, 14, 15, 16, 17, 18, 19, 0, 1))
- (14,ArrayBuffer(13, 14, 15, 16, 17, 18, 19, 0, 1, 2))
- (15,ArrayBuffer(14, 15, 16, 17, 18, 19, 0, 1, 2, 3))
- (16,ArrayBuffer(15, 16, 17, 18, 19, 0, 1, 2, 3, 4))
- (17,ArrayBuffer(16, 17, 18, 19, 0, 1, 2, 3, 4, 5))
- (18,ArrayBuffer(17, 18, 19, 0, 1, 2, 3, 4, 5, 6))
- (19,ArrayBuffer(18, 19, 0, 1, 2, 3, 4, 5, 6, 7))
- // 其分布策略源码如下:
- private def assignReplicasToBrokersRackUnaware(
- nPartitions: Int, //分区的个数 10
- replicationFactor: Int, //副本的个数 5
- brokerList: Seq[Int],//broker的集合 8 0~7
- fixedStartIndex: Int//默认值是-1 固定开始的索引位置
- startPartitionId: Int): Map[Int, Seq[Int]] //默认值是-1 分区开始的位置
- = {
- val ret = mutable.Map[Int, Seq[Int]]()
- val brokerArray = brokerList.toArray
- val startIndex = if (fixedStartIndex >= 0) {
- fixedStartIndex
- }else {
- rand.nextInt(brokerArray.length)
- }
- var currentPartitionId = math.max(0, startPartitionId)
- var nextReplicaShift = if (fixedStartIndex >= 0) {
- fixedStartIndex
- }else {
- rand.nextInt(brokerArray.length)
- }
- for (_ <- 0 until nPartitions) {
- if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)){
- nextReplicaShift += 1
- }
- val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
- val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
- for (j <- 0 until replicationFactor - 1) {
- replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
- }
- ret.put(currentPartitionId, replicaBuffer)
- currentPartitionId += 1
- }
- ret
- }
-
- private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
- val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
- (firstReplicaIndex + shift) % nBrokers
- }
复制代码 Range Strategy
- 先将消费者按照client.id字典排序,然后按topic逐个处理;
- 针对一个topic,将其partition总数/消费者数得到商n和 余数m,则每个consumer至少分到n个分区,且前m个consumer每人多分一个分区;
例1:
假设有TOPIC_A有5个分区,由3个consumer(C1,C2,C3)来消费;5/3得到商1,余2,则每个消费者至少分1个分区,前两个消费者各多1个分区C1: 2个分区,C2:2个分区,C3:1个分区
接下来,就按照“区间”进行分配:
TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_A_3 TOPIC_A-4
C1: TOPIC_A-0, TOPIC_A-1
C2 : TOPIC_A-2, TOPIC_A_3
C3: TOPIC_A-4
例2:
假设TOPIC_A有5个分区,TOPIC_B有3个分区,由2个consumer(C1,C2)来消费
5/2得到商2,余1,则C1有3个分区,C2有2个分区,得到结果
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2
C2: TOPIC_A-3 TOPIC_A-4
3/2得到商1,余1,则C1有2个分区,C2有1个分区,得到结果
C1: TOPIC_B-0 TOPIC_B-1
C2: TOPIC_B-2
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-0 TOPIC_B-1
C2: TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
Round-Robin Strategy
- 将所有主题分区组成TopicAndPartition列表,并对TopicAndPartition列表按照其hashCode 排序
- 然后,以轮询的方式分配给各消费者
以上述“例2”来举例:
- 先对TopicPartition的hashCode排序,假如排序结果如下:
TOPIC_A-0 TOPIC_B-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-1 TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_B-1
C2: TOPIC_B-0 TOPIC_A-2 TOPIC_A-3
C3 TOPIC_A-4
Sticky Strategy
对应的类叫做: org.apache.kafka.clients.consumer.StickyAssignor
sticky策略的特点:
- 要去达成最大化的均衡
- 尽可能保留各消费者原来分配的分区
再均衡的过程中,还是会让各消费者先取消自身的分区,然后再重新分配(只不过是分配过程中会尽量让原来属于谁的分区依然分配给谁)
Cooperative Sticky Strategy
对应的类叫做: org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
sticky策略的特点:
- 逻辑与sticky策略一致
- 支持cooperative再均衡机制(再均衡的过程中,不会让所有消费者取消掉所有分区然后再进行重分配)
费者组再均衡流程
消费组在消费数据的时候,有两个角色进行组内的各事务的协调;
角色1: Group Coordinator (组协调器) 位于服务端(就是某个broker)
组协调器的定位:- partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
- partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
复制代码 角色2: Group Leader (组长) 位于消费端(就是消费组中的某个消费者)
组长的定位:随机选的哦!!!
GroupCoordinator介绍
每个消费组在服务端对应一个GroupCoordinator其进行管理,GroupCoordinator是Kafka服务端中用于管理消费组的组件。
消费者客户端中由ConsumerCoordinator组件负责与GroupCoordinator行交互;
ConsumerCoordinator和GroupCoordinator最重要的职责就是负责执行消费者rebalance操作
再均衡监听器
如果想控制消费者在发生再均衡时执行一些特定的工作,可以通过订阅主题时注册“再均衡监听器”来实现;
场景举例:在发生再均衡时,处理消费位移
如果A消费者消费掉的一批消息还没来得及提交offset,而它所负责的分区在rebalance中转移给了B消费者,则有可能发生数据的重复消费处理。此情形下,可以通过再均衡监听器做一定程度的补救;
代码示例:- coordinator在我们组记偏移量的__consumer_offsets分区的leader所在broker上
- 查找Group Coordinator的方式:
- 先根据消费组groupid的hashcode值计算它应该所在__consumer_offsets 中的分区编号; 分区数
- Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
- groupMetadataTopicPartitionCount为__consumer_offsets的分区总数,这个可以通过broker端参数offset.topic.num.partitions来配置,默认值是50;
- 找到对应的分区号后,再寻找此分区leader副本所在broker节点,则此节点即为自己的Grouping Coordinator;
复制代码 kafka系统的CAP保证
CAP理论作为分布式系统的基础理论,它描述的是一个分布式系统在以下三个特性中:
- 一致性(Consistency)
- 可用性(Availability)
- 分区容错性(Partition tolerance)
最多满足其中的两个特性。也就是下图所描述的。分布式系统要么满足CA,要么CP,要么AP。无法同时满足CAP。
分区容错性:指的分布式系统中的某个节点或者网络分区出现了故障的时候,整个系统仍然能对外提供满足一致性和可用性的服务。也就是说部分故障不影响整体使用。事实上我们在设计分布式系统时都会考虑到bug,硬件,网络等各种原因造成的故障,所以即使部分节点或者网络出现故障,我们要求整个系统还是要继续使用的(不继续使用,相当于只有一个分区,那么也就没有后续的一致性和可用性了)
可用性:一直可以正常的做读写操作。简单而言就是客户端一直可以正常访问并得到系统的正常响应。用户角度来看就是不会出现系统操作失败或者访问超时等问题。
一致性:在分布式系统完成某写操作后任何读操作,都应该获取到该写操作写入的那个最新的值。相当于要求分布式系统中的各节点时时刻刻保持数据的一致性。
Kafka 作为一个商业级消息中间件,数据可靠性和可用性是优先考虑的重点,兼顾数据一致性;
参考文档:https://www.cnblogs.com/lilpig/p/16840963.html
幂等性
幂等性要点
Kafka 0.11.0.0 版本开始引入了幂等性与事务这两个特性,以此来实现 EOS ( exactly once semantics ,精确一次处理语义)
生产者在进行发送失败后的重试时(retries),有可能会重复写入消息,而使用 Kafka幂等性功能之后就可以避免这种情况。
开启幂等性功能,只需要显式地将生产者参数 enable.idempotence设置为 true (默认值为 false):- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.header.Headers;
- import org.apache.kafka.common.record.TimestampType;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import java.time.Duration;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.Optional;
- import java.util.Properties;
- /**
- * 消费组再均衡观察
- */
- public class ConsumerDemo2 {
- public static void main(String[] args) {
- //1.创建kafka的消费者对象,附带着把配置文件搞定
- Properties props = new Properties();
- props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
- props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g01");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- //2.订阅主题(确定需要消费哪一个或者多个主题)
- //我现在想看看如果我的消费者组里面,多了一个消费者或者少了一个消费者,他有没有给我做再均衡
- consumer.subscribe(Arrays.asList("reb-1", "reb-2"), new ConsumerRebalanceListener() {
- /**
- * 这个方法是将原来的分配情况全部取消,或者说把所有的分区全部回收了
- * 这个全部取消很恶心,原来的消费者消费的好好的,他一下子就给他全部停掉了
- * @param collection
- */
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> collection) {
- System.out.println("我原来的均衡情况是:"+collection + "我已经被回收了!!");
- }
- /**
- * 这个方法是当上面的分配情况全部取消以后,调用这个方法,来再次分配,这是在均衡分配后的情况
- * @param collection
- */
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> collection) {
- System.out.println("我是重新分配后的结果:"+collection);
- }
- });
- while (true){
- consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
- }
- }
- }
复制代码 在开启幂等性功能时,如下几个参数必须正确配置:
<ul>retries > 0
max.in.flight.requests.per.connection发送一个ControlBatch消息(事务开始)
提交事务-->发送一个ControlBatch消息(事务提交)
放弃事务-->发送一个ControlBatch消息(事务终止)</p>
- 开启事务的必须配置参数(不支持数据得回滚,但是我能做到,一荣俱荣,一损俱损)
- props.put("enable.idempotence",true);
复制代码 事务控制的代码模板- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092");
- props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- // acks
- props.setProperty(ProducerConfig.ACKS_CONFIG,"-1");
- // 生产者的重试次数
- props.setProperty(ProducerConfig.RETRIES_CONFIG,"3");
- // 飞行中的请求缓存最大数量
- props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"3");
- // 开启幂等性
- props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
- // 设置事务id
- props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"trans_001");
复制代码 消费者api是会拉取到尚未提交事务的数据的;只不过可以选择是否让用户看到!
是否让用户看到未提交事务的数据,可以通过消费者参数来配置:- // 初始化事务
- producer.initTransaction( )
- // 开启事务
- producer.beginTransaction( )
- // 干活
- // 提交事务
- producer.commitTransaction( )
- // 异常回滚(放弃事务) catch里面
- producer.abortTransaction( )
复制代码
- kafka还有一个“高级”事务控制,只针对一种场景:
用户的程序,要从kafka读取源数据,数据处理的结果又要写入kafka
kafka能实现端到端的事务控制(比起上面的“基础”事务,多了一个功能,通过producer可以将consumer的消费偏移量绑定到事务上提交)- isolation.level=read_uncommitted(默认值)
- isolation.level=read_committed
复制代码 事务api示例
为了实现事务,应用程序必须提供唯一transactional.id,并且开启生产者的幂等性- producer.sendOffsetsToTransaction(offsets,consumer_id)
复制代码 “消费kafka-处理-生产结果到kafka”典型场景下的代码结构示例:- properties.put ("transactional.id","transactionid00001");
- properties.put ("enable.idempotence",true);
复制代码 Kafka速度快的原因
- 消息顺序追加(磁盘顺序读写比内存的随机读写还快)
- 页缓存等技术(数据交给操作系统的页缓存,并不真正刷入磁盘;而是定期刷入磁盘)
使用Zero-Copy (零拷贝)技术来进一步提升性能;
零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手;
零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换;对于Linux系统而言,零拷贝技术依赖于底层的 sendfile( )方法实现;对应于Java 语言,FileChannal.transferTo( )方法的底层实现就是 sendfile( )方法;
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |