IT评测·应用市场-qidao123.com

标题: 一文说透kafka底层架构 [打印本页]

作者: 梦见你的名字    时间: 2022-9-16 17:16
标题: 一文说透kafka底层架构
底层架构

先停一下,学习之前,先看下如何学习,两篇不错的干货文章分享给你,一定要点开看下
6.1 存储架构

6.1.1 分段存储

开篇讲过,kafka每个主题可以有多个分区,每个分区在它所在的broker上创建一个文件夹
每个分区又分为多个段,每个段两个文件,log文件里顺序存消息,index文件里存消息的索引
段的命名直接以当前段的第一条消息的offset为名
注意是偏移量,不是序号! 第几条消息 = 偏移量 + 1。类似数组长度和下标。
所以offset从0开始(可以开新队列新groupid消费第一条消息打印offset得到验证)

例如:
0.log -> 有8条,offset为 0-7
8.log -> 有两条,offset为 8-9
10.log -> 有xx条,offset从10-xx

6.1.2 日志索引

每个log文件配备一个索引文件 *.index
文件格式为: (offset , 内存偏移地址)

综合上述,来看一个消息的查找:
6.1.3 日志删除

Kafka作为消息中间件,数据需要按照一定的规则删除,否则数据量太大会把集群存储空间占满。
删除数据方式:
Kafka删除数据的最小单位:segment,也就是直接干掉文件!一删就是一个log和index文件
6.1.4 存储验证

1)数据准备
将broker 2和3 停掉,只保留1
  1. docker pause kafka-2 kafka-3
复制代码
2)删掉test主题,通过km新建一个test主题,加2个分区
新建时,注意下面的选项:
segment.bytes = 1000 ,即:每个log文件到达1000byte时,开始创建新文件
删除策略:
retention.bytes = 2000,即:超出2000byte的旧日志被删除
retention.ms = 60000,即:超出1分钟后的旧日志被删除
以上任意一条满足,就会删除。
3)进入kafka-1这台容器
  1. docker exec -it kafka-1 sh
  2. #查看容器中的文件信息
  3. / # ls /
  4. bin    dev    etc    home   kafka  lib    lib64  media  mnt    opt    proc   root   run    sbin   srv    sys    tmp    usr    var
  5. / # cd /kafka/
  6. /kafka # ls
  7. kafka-logs-d0b9c75080d6
  8. /kafka # cd kafka-logs-d0b9c75080d6/
  9. /kafka/kafka-logs-d0b9c75080d6 # ls -l | grep test
  10. drwxr-xr-x    2 root     root          4096 Jan 15 14:35 test-0
  11. drwxr-xr-x    2 root     root          4096 Jan 15 14:35 test-1
  12. #2个分区的日志文件清单,注意当前还没有任何消息写进来
  13. #timeindex:日志的时间信息
  14. #leader-epoch,下面会讲到
  15. /kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
  16. test-0:
  17. total 4
  18. -rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
  19. -rw-r--r--    1 root     root             0 Jan 15 14:35 00000000000000000000.log
  20. -rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
  21. -rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint
  22. test-1:
  23. total 4
  24. -rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
  25. -rw-r--r--    1 root     root             0 Jan 15 14:35 00000000000000000000.log
  26. -rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
  27. -rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint
复制代码
4)往里灌数据。启动项目通过swagger发送消息
注意!边发送边查看上一步的文件列表信息!
  1. #先发送2条,消息开始进来,log文件变大!消息在两个分区之间逐个增加。
  2. /kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
  3. test-0:
  4. total 8
  5. -rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
  6. -rw-r--r--    1 root     root           875 Jan 15 14:46 00000000000000000000.log
  7. -rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
  8. -rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint
  9. test-1:
  10. total 8
  11. -rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
  12. -rw-r--r--    1 root     root           875 Jan 15 14:46 00000000000000000000.log
  13. -rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
  14. -rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint
  15. #继续逐条发送,返回再来看文件,大小为1000,到达边界!
  16. /kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
  17. test-0:
  18. total 8
  19. -rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
  20. -rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
  21. -rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
  22. -rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint
  23. test-1:
  24. total 8
  25. -rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
  26. -rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
  27. -rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
  28. -rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint
  29. #继续发送消息!1号分区的log文件开始分裂
  30. #说明第8条消息已经进入了第二个log
  31. /kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
  32. test-0:
  33. total 8
  34. -rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
  35. -rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
  36. -rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
  37. -rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint
  38. test-1:
  39. total 20
  40. -rw-r--r--    1 root     root             0 Jan 15 14:46 00000000000000000000.index
  41. -rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
  42. -rw-r--r--    1 root     root            12 Jan 15 14:46 00000000000000000000.timeindex
  43. -rw-r--r--    1 root     root      10485760 Jan 15 14:46 00000000000000000008.index
  44. -rw-r--r--    1 root     root           125 Jan 15 14:46 00000000000000000008.log   #第二个log文件!
  45. -rw-r--r--    1 root     root            10 Jan 15 14:46 00000000000000000008.snapshot
  46. -rw-r--r--    1 root     root      10485756 Jan 15 14:46 00000000000000000008.timeindex
  47. -rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint
  48. #持续发送,另一个分区也开始分离
  49. /kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
  50. test-0:
  51. total 20
  52. -rw-r--r--    1 root     root             0 Jan 15 15:55 00000000000000000000.index
  53. -rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
  54. -rw-r--r--    1 root     root            12 Jan 15 15:55 00000000000000000000.timeindex
  55. -rw-r--r--    1 root     root      10485760 Jan 15 15:55 00000000000000000008.index
  56. -rw-r--r--    1 root     root           625 Jan 15 15:55 00000000000000000008.log
  57. -rw-r--r--    1 root     root            10 Jan 15 15:55 00000000000000000008.snapshot
  58. -rw-r--r--    1 root     root      10485756 Jan 15 15:55 00000000000000000008.timeindex
  59. -rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint
  60. test-1:
  61. total 20
  62. -rw-r--r--    1 root     root             0 Jan 15 14:46 00000000000000000000.index
  63. -rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
  64. -rw-r--r--    1 root     root            12 Jan 15 14:46 00000000000000000000.timeindex
  65. -rw-r--r--    1 root     root      10485760 Jan 15 14:46 00000000000000000008.index
  66. -rw-r--r--    1 root     root           750 Jan 15 15:55 00000000000000000008.log
  67. -rw-r--r--    1 root     root            10 Jan 15 14:46 00000000000000000008.snapshot
  68. -rw-r--r--    1 root     root      10485756 Jan 15 14:46 00000000000000000008.timeindex
  69. -rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint
  70. #持续发送消息,分区越来越多。
  71. #过一段时间后再来查看,清理任务将会执行,超出的日志被删除!(默认调度间隔5min)
  72. #log.retention.check.interval.ms 参数指定
  73. /kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
  74. test-0:
  75. total 8
  76. -rw-r--r--    1 root     root      10485760 Jan 15 19:12 00000000000000000119.index
  77. -rw-r--r--    1 root     root             0 Jan 15 19:12 00000000000000000119.log
  78. -rw-r--r--    1 root     root            10 Jan 15 19:12 00000000000000000119.snapshot
  79. -rw-r--r--    1 root     root      10485756 Jan 15 19:12 00000000000000000119.timeindex
  80. -rw-r--r--    1 root     root            10 Jan 15 19:12 leader-epoch-checkpoint
  81. test-1:
  82. total 8
  83. -rw-r--r--    1 root     root      10485760 Jan 15 19:12 00000000000000000119.index
  84. -rw-r--r--    1 root     root             0 Jan 15 19:12 00000000000000000119.log
  85. -rw-r--r--    1 root     root            10 Jan 15 19:12 00000000000000000119.snapshot
  86. -rw-r--r--    1 root     root      10485756 Jan 15 19:12 00000000000000000119.timeindex
  87. -rw-r--r--    1 root     root            10 Jan 15 19:12 leader-epoch-checkpoint
复制代码
6.2 零拷贝

Kafka 在执行消息的写入和读取这么快,其中的一个原因是零拷贝(Zero-copy)技术
6.2.1 传统文件读写


传统读写,涉及到 4 次数据的复制。但是这个过程中,数据完全没有变化,我们仅仅是想从磁盘把数据送到网卡。
那有没有办法不绕这一圈呢?让磁盘和网卡之类的外围设备直接访问内存,而不经过cpu?
有! 这就是DMA(Direct Memory Access 直接内存访问)。
6.2.2 DMA

DMA其实是由DMA芯片(硬件支持)来控制的。通过DMA控制芯片,可以让网卡等外部设备直接去读取内存,而不是由cpu来回拷贝传输。这就是所谓的零拷贝
目前计算机主流硬件基本都支持DMA,就包括我们的硬盘和网卡。
kafka就是调取操作系统的sendfile,借助DMA来实现零拷贝数据传输的

6.2.3 java实现

为加深理解,类比为java中的零拷贝:
代码参考:
  1. File file = new File("0.log");
  2. RandomAccessFile raf = new RandomAccessFile(file, "rw");
  3. //文件通道,来源
  4. FileChannel fileChannel = raf.getChannel();
  5. //网络通道,去处
  6. SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("1.1.1.1", 1234));
  7. //对接上,通过transfer直接送过去
  8. fileChannel.transferTo(0, fileChannel.size(), socketChannel);
复制代码
6.3 分区一致性

6.3.1 水位值

1)先回顾两个值:

2)再看下几个值的存储位置:
注意!分区是有leader和follower的,最新写的消息会进入leader,follower从leader不停的同步
无论leader还是follower,都有自己的HW和LEO,存储在各自分区所在的磁盘上
leader多一个Remote LEO,它表示针对各个follower的LEO,leader又额外记了一份!
3)为什么这么做呢?
leader会拿这些remote值里最小的来更新自己的hw,具体过程我们详细往下看
6.3.2 同步原理


我们来看这几个值是如何更新的:
1)leader.LEO
这个很简单,每次producer有新消息发过来,就会增加
2)其他值
另外的4个值初始化都是 0
他们的更新由follower的fetch(同步消息线程)得到的数据来决定!
如果把fetch看做是leader上提供的方法,由follower远程请求调用,那么它的伪代码大概是这个样子:
  1. //java伪代码!
  2. //follower端的操作,不停的请求从leader获取最新数据
  3. class Follower{
  4.   private List<Message> messages;
  5.   private HW hw;
  6.   private LEO leo;
  7.   
  8.   @Schedule("不停的向leader发起同步请求")
  9.   void execute(){
  10.     //向leader发起fetch请求,将自己的leo传过去
  11.     //leader返回leo之后最新的消息,以及leader的hw
  12.     LeaderReturn lr = leader.fetch(this.leo) ;
  13.    
  14.     //存消息
  15.     this.messages.addAll(lr.newMsg);
  16.     //增加follower的leo值
  17.     this.leo = this.leo + lr.newMsg.length;
  18.     //比较自己的leo和leader的hw,取两者小的,作为follower的hw
  19.     this.hw = min(this.leo , lr.leaderHW);
  20.   }
  21. }
  22. //leader返回的报文
  23. class LeaderReturn{
  24.   //新增的消息
  25.   List<Messages> newMsg;
  26.   //leader的hw
  27.   HW leaderHW;
  28. }
复制代码
  1. //leader在接到follower的fetch请求时,做的逻辑
  2. class Leader{
  3.   private List<Message> messages;
  4.   private LEO leo;
  5.   private HW hw;
  6.   //Leader比follower多了个Remote!
  7.   //注意!如果有多个副本,那么RemoteLEO也有多个,每个副本对应一个
  8.   private RemoteLEO remoteLEO;
  9.   
  10.   //接到follower的fetch请求时,leader做的事情
  11.   LeaderReturn fetch(LEO followerLEO){
  12.     //根据follower传过来的leo,来更新leader的remote
  13.     this.remoteLEO = followerLEO ;
  14.     //然后取ISR(所有可用副本)的最小leo作为leader的hw
  15.     this.hw = min(this.leo , this.remoteLEO) ;
  16.    
  17.     //从leader的消息列表里,查找大于follower的leo的所有新消息
  18.     List<Message> newMsg = queryMsg(followerLEO) ;
  19.    
  20.     //将最新的消息(大于follower leo的那些),以及leader的hw返回给follower
  21.     LeaderReturn lr = new LeaderReturn(newMsg , this.hw)
  22.     return lr;
  23.   }
  24.   
  25. }
复制代码
6.3.3 Leader Epoch

1)产生的背景
0.11版本之前的kafka,完全借助hw作为消息的基准,不管leo。
发生故障后的规则:
假设:
我们有两个副本:leader(A),follower(B)
场景一:丢数据

场景二:数据不一致

2)改进思路
0.11之后,kafka改进了hw做主的规则,这就是leader epoch
leader epoch给leader节点带了一个版本号,类似于乐观锁的设计。
它的思想是,一旦发生机器故障,重启之后,不再机械的将leo退回hw
而是借助epoch的版本信息,去请求当前leader,让它去算一算leo应该是什么
3)实现原理
对比上面丢数据的问题:


再来看一致性问题的解决:


附:epochRequest的详细流程图

本文由传智教育博学谷 - 狂野架构师教研团队发布
如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力
转载请注明出处!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4