kafka数据在服务端时怎么写入的

打印 上一主题 下一主题

主题 502|帖子 502|积分 1506

学习配景

接着上篇,我们来聊聊kafka数据在服务端怎么写入的
服务端写入

在介绍服务端的写流程之前,我们先要明白服务端的几个角色之间的关系。
假设我们有一个由3个broker构成的kafka集群,我们在这个集群上创建一个topic叫做shitu-topic,他有10个分区,每个分区有3个副本。那么partition和broker的关系假设如下。


因为每个partition有3个副本,所以每个partition的副本都会均匀的分布在这三台呆板上,我们取shitu-topic-0的副本来观察。
在三个broker上,每个broker的log存储目录都有一个shitu-topic-0目录,我们可以成为shitu-topic-0分区,但是同一个时间,只有broker-0上的leader副本对外提供服务,broker-1和broker-2需要去到broker-0上同步消息。在shitu-topic-0目录下就是存储的实际的日志文件。日志文件里包含三个主要的文件内容.log文件存储实际的消息,.index文件存储索引,.timeindex文件存储时间索引。我们把这三个文件合称为一个logsegment日志段,每个log文件只要超过1G就会产生一个新的段文件。日志段文件的命名是以当前段内第一条消息的offset来命名的,这里因为是新创建的topic,第一条消息是0,所以都是0。因为消息是次序写入的,所以只有末了一个日志段是激活的我们称为active segemnt,活泼段。好比这里活泼段就是00000000000020123000开头的段。


研究消息的写入,就是研究这些文件时怎么产生的,让我们来看一下段文件里每个文件的构造格式。
写入文件

log文件

.log文件存储实际的写入日志,也就是实际的数据存储位置。kafka的log文件存储格式颠末了3次变化,目前使用的日志格式称为V2版本,我们取这个版本的日志格式来做讲解。


上图左侧表现的是log文件的格式,我们把log文件内存储的的消息集合称为record batch,而每条消息我们称为一条record,每条record的格式如右边所示。record batch内的字段主要记录的整个log文件的全局属性,好比log文件的起始偏移量,文件长度,epoch,时间戳等等,不做详细解释,也不是重点。
我们说一下每条消息的格式,我们知道每条消息除了实际的消息内容value外,陪同着每条消息的产生,还会产生这条消息的额外附带的信息,好比消息的偏移量,offsset,时间戳timestamp等等。kafka在设计消息的存储时花了很大的心思。
这里我解释一下varint,varlong范例,简单的说,就是可变长的范例。好比一条消息的偏移量是int存储容量是4字节,好比存储10这个偏移量,固然前面大部门是0,但是实际存储照旧需要4字节。而varint则可以根据数据的范围选择合适的存储,好比照旧10,那么实际记录这个值1个字节就够了。这样,当写入消息时,好比写入2条消息,偏移量分别是10和11,假如分别存储这两个偏移量,需要
  1. 2 * 4B = 8b
复制代码
而假如使用varint存储,则只需要
  1. 2 * 1B + 4B(基础偏移量) = 6B
复制代码
这里假如不是2条消息,而是10000条消息,那么这个优化就会非常有效。kafka这么做是为了尽最大的可能使用存储空间。固然除了数据格式上的优化,kafka还对数据进行了压缩,也就是records是可以配置差异的压缩算法进行压缩的,好比ZIP。
index文件

.index记录偏移量到实际消息的映射关系。一个很简单的述求,我们想知道某个偏移量的日志的内容,那么我们就需要一种能根据偏移量定位到消息的格式。
index文件的格式由相对偏移量realtive offset和物理偏移量position构成。当一条消息写入时,根据消息的偏移量计算出这条消息的相对偏移量,好比写入的是20123025这条消息,那么用20123025-20123000 = 25得到相对偏移量25,再记录下这消息的起始物理地址1024,即可构成对这条消息的索引。需要·注意的是,这里的索引是希罕索引,也就是不是每条消息都会产生索引,而是每隔一些消息产生索引,这样能减少索引的文件巨细。
每一条索引的需要4B的相对偏偏移量和4B的物理地址偏移量,一共8B,kafka的服务端在设置index文件最大巨细时要求index文件必须是索引项的整数倍,假如不是,则会主动转换成最靠近的整数倍的数字。



各人这里肯定很好奇那么怎么利用相对偏移量来查找消息,我们解释一下,着实对消息的查找可以概述为根据二分法查找。好比想要查找20123050这条偏移量的消息,先根据这个偏移量,去到我们当前副本的segement集合中根据segement的起始偏移量找到对应的segement,所有的segement的信息是根据相对偏移量以跳表的形式记录的。找到的对应的segement后先计算出相对偏移量20123050-20123000 = 50,然后根据50这个相对偏移量,我们去到相对偏量数组里,使用二分查找找到[20,75]这个相对偏移量范围,那么我们可以在log文件里从1024字节开始,逐条消息的剖析,并计算出消息的偏移量是不是50,直到2147字节这个竣事的位置为止。假如能找到,说明消息在本partition内,不能我们再换另外的partition查找。
timeindex文件

timeidnex记录时间戳到实际消息的映射关系,我们介绍了index文件的格式,再来明白timeindex文件的格式就轻易多了。timeindex文件和index文件的格式类似,由时间戳相对偏移量和消息相对偏移量构成。时间戳相对偏移量根据消息的写入时间来计算,好比写入时间是1733001000,用这个写入的时间减去timeindex文件的起始时间1733000000得到1000这个相对时间戳偏移量。


timeindex文件的查找我们就不说了,各人可以参考index文件。需要注意一点timeindex文件的时间戳是可以设置的,固然一样平常kafka服务端会接纳主动设置消息写入时间的配置,即log.message.timestamp.type=LogAppendTime,这种情况下因为时间戳由服务器端设置,可以或许保证时间戳递增。但是假如服务端设置的是CreateTime,并且producer本身设置了消息的生产时间,那么有可能造成timeIndex的写入失败,因为timeindex要求写入的时间必须是递增的。假如不递增,则拒绝本次写入。另有就是,timeindex文件和index文件固然都是索引,但是他们并不是每条索引项逐一对应的,各人从图中也能看出来。
根据timeindex查找对应消息的过程也和index文件的查找类似,不外因为timeindex本身是根据时间戳来查找,所以会有一步先查找每个timeindex文件的最大时间戳,直到找到一个大于查找时间并且最靠近查找时间的timeindex文件。这里有点绕,举个例子,第一个timeindex文件的最大时间戳10000,第二个timeindex文件最大时间戳23000,第三个timeindex文件最大时间戳50000,要查找时间戳为15000的消息,那么因为timeindex文件的时间戳是次序递增的,很明显,第三个文件的消息都是在15000之后产生的,第一个文件的消息都是在15000之前产生的,那么理所应当的,正好拥有大于15000的时间戳23000的第二个文件理论上应该包含15000这个时间戳写入的消息,所以找到第二个文件。找到对应的文件后再去到到对应的这个timeindex文件根据时间偏移量索引找到这个对应的消息(找不到就换partition)。
写入过程

介绍完毕实际的文件内容,我们再来归纳一下数据的写入过程。这里不会介绍副本之间的同步的问题,只介绍在leader副本上数据的写入。
当消息通过client发送到broker上时,broker根据消息的topic找到这个topic的leadder副本。leadter副本根据消息的信息计算出消息归属的parititon。找到parititon后根据偏移量设置计算出消息的偏移量和时间戳,再找到对应的active segement,在index文件中追加消息,并根据需要决定是否写入index文件和timeindex文件。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

星球的眼睛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表