Kafka源码分析(十八)——Broker:日志子系统——团体架构 ...

打印 上一主题 下一主题

主题 934|帖子 934|积分 2802

作者简介:大家好,我是smart哥,前复兴通讯、美团架构师,现某互联网公司CTO

接洽qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬
  学习必须往深处挖,挖的越深,根本越踏实!
阶段1、深入多线程

阶段2、深入多线程设计模式

阶段3、深入juc源码解析

阶段4、深入jdk其余源码解析

阶段5、深入jvm源码解析
码哥源码部分
码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】
码哥讲源码【炸雷啦!炸雷啦!黄光头他终于跑路啦!】
码哥讲源码-【jvm课程前置知识及c/c++调试环境搭建】
​​​​​​码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】
码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操纵?】
码哥讲源码【你水不是你的错,但是你胡说八道就是你不对了!】
码哥讲源码【谁再说Spring不支持多线程事件,你给我抽他!】
终结B站没人能讲清楚红黑树的历史,不服等你来踢馆!
打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果看成最终结果的水货们请闭嘴】
 
从开始本章,我将讲解Kafka Server的日志子系统(Log Subsystem)。Log Subsystem负责的核心工作就是日志的长期化,也就是写消息到磁盘。Kafka之以是具有极高的性能,和Log Subsystem的优秀设计是分不开的。
本章,我先带大家回顾下Kafka的整个日志文件系统的设计,然后对Server端的几个核心日志组件进行团体讲解。

一、日志结构

我们回顾下《透彻理解Kafka(二)——消息存储:日志格式》中的内容,假设有一个名为“ topic”的主题,此主题具有 4 个分区 ,那么在物理存储上就有topic-0、topic-1、topic-2、topic-3四个目录:
  1.     [root@nodel kafka- logs]# ls -al | grep topic-log
  2.     drwxr-xr - x 2 root root 4096 May 16 18: 33 topic-0
  3.     drwxr-xr - x 2 root root 4096 May 16 18: 33 topic-1
  4.     drwxr-xr - x 2 root root 4096 May 16 18: 33 topic-2
  5.     drwxr-xr - x 2 root root 4096 May 16 18: 33 topic-3
复制代码
每个分区的目录下,都有许多log segment file(日志段文件),也就是说每个分区的数据都会被拆成多个文件,并且每个文件都有自己的索引文件,如下图:

当生产者发送消息时,Kafka会将消息 顺序写入分区的最后一个 LogSegment 中,分区中的消息具有唯一的 offset ,每个LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其它文件(比如以.txnindex为后缀的事件索引文件),比如:
  1.     00000000000000000000.index
  2.     00000000000000000000.log
  3.     00000000000000000000.timeindex
  4.    
  5.     00000000000005367851.index
  6.     00000000000005367851.log
  7.     00000000000005367851.timeindex
  8.    
  9.     00000000000009936472.index
  10.     00000000000009936472.log
  11.     00000000000009936472.timeindex
复制代码
每个 LogSegment 都有一个基准偏移量 baseOffset(64 位长整型数),用来表示当前 LogSegment中第一条消息的 offset 。比如上述的示例中,第一个 LogSegment 的基准偏移量为 0,对应的日志文件为00000000000000000000.log,而9936472就表示00000000000009936472.log这个日志段文件的baseOffset。
Kafka Broker中有一个参数log.segment.bytes,限定了每个日志段文件的大小,默认1GB。一个日志段文件满了,就会新建一个日志段文件来写入,制止单个文件过大,影响文件的读写性能,这个过程叫做 log rolling ,正在被写入的谁人日志段文件叫做 active log segment 。
   Kafka使用了ConcurrentSkipListMap来保存各个日志段,以每个日志段的baseOffset作为 key ,这样可以根据消息的offset快速定位到其所在的日志分段 。
  
1.1 索引文件

Kafka在写入日志文件的时间,会同时写索引文件,一个是 位移(偏移量)索引 (.index后缀),一个是 时间戳索引 (.timeindex后缀),也就是说每个日志段文件(.log后缀)都对应两个索引文件。
索引文件里的数据是按照位移/时间戳升序排序的,Kafka会用二分法查找索引,时间复杂度是O(logN),找到索引就可以在.log文件里定位到数据了。

   Kafka以 希罕矩阵 的方式构造索引,不包管每个消息都有对应的索引。Kafka Broker中有个参数log.index.interval.bytes,默认值为4096字节,表示每往日志文件写入4096字节数据就要在索引文件里写一条索引项。
  
位移索引

位移索引就是用来纪录消息偏移量(offset)与物理所在之间的映射关系。位移索引项的格式如下,每个索引项占用4个字节,分为两个部分:

relativeOffset: 相对位移,表示消息相对于基准偏移量 baseOffset的位移,也就是说relativeOffset = 绝对位移 - baseOffset;
position: 物理所在,表示消息在日志分段文件中对应的物理位置。
举个例子:假设我们要查找分区中offset=25的消息,是怎样一个流程呢?

  • 起首,由于Kafka使用了ConcurrentSkipListMap来保存各个日志分段(以每个日志段的baseOffset作为 key) ,以是可以快速定位到offset=25这个消息所在的日志分段;
  • 假设第一步定位到了00000000000000000000.log这个日志分段,它的基准偏移量 baseOffset 为0,那么相对位移 = 25- 0 = 25;
  • 接着,通过二分法在00000000000000000000.index中查找最后一个relativeOffset ≤ 25的索引项,找到了[22,656];
  • 最后,根据索引项中的position值,即656,从00000000000000000000.log中的656物理位置开始向后查找,直到找到offset=25的消息。


时间戳索引

时间戳索引项的格式如下图:

timestamp : 当前日志分段最大的时间戳。
relativeOffset: 时间戳所对应的消息的相对位移。
同样举个例子:假设我们要查找时间戳 targetTimeStamp = 1526384718288 开始的消息,是怎样一个流程呢?
起首,要找到targetTimeStamp所在的日志段,这里无法使用跳表来快速定位,必要进行以下步调:

  • 将 targetTimeStamp 和每个日志段中的最大时间戳 largestTimeStamp 逐一对比(每个日志段都会纪录自己的最大时间戳),直到找到最后一个小于等于targetTimeStamp 的日志段,假设就是00000000000000000000.log这个日志段;
  • 然后在这个日志段的时间戳索引文件00000000000000000000.timeindex中进行二分查找,找到最后一个小于等于targetTimeStamp 的索引项,这里是1526384718283,它的相对位移是28;
  • 接着,再根据相对位移28,去00000000000000000000.index中查找消息的物理所在,找到了[26,838]这个索引项;
  • 最后,从00000000000000000000.log的838的物理位置开始向后遍历查找 targetTimeStamp = 1526384718288的消息。

从上面的整个流程也可以看出,根据时间戳索引查找消息时,要 进行两次索引查找 ,服从要比直接根据位移索引查找低许多。
   索引项中的 timestamp 必须大于之前追加的索引项的 timestamp ,否则不予追加 。假如 Kafka Broker 端参数log.message.timestamp.type设置为LogAppendTime,那么消息的时间戳肯定能够保持单调递增;相反,假如由生产者自己指定时间戳,则可能造成当前分区的时间戳乱序。
  
二、核心组件

回顾了Kafka的日志结构,我们从源码的角度看下Log Subsystem所涉及的核心组件:


  • ReplicaManager: 负责管理和操纵集群中 Broker 的副本,还承担部分分区管理工作;
  • LogManager: 日志管理类,负责控制日志创建、日志读取/写入、日志清理;
  • Log: 日志是日志段的容器,负责管理日志段;
  • LogSegment: 日志段;
  • OffsetIndex: 位移索引;
  • TimeIndex: 时间索引。
我在后续章节会对上述这些组件的底层源码进行分析,但是我们还是先按照正常的功能调用流程来讲解。

2.1 初始化

KafkaServer启动后,会创建LogManager和ReplicaManager,这两个是与日志读写相关的核心管理组件,ReplicaManager内部封装了LogManager:
  1.     // KafkaServer.scala
  2.     def startup() {
  3.         logManager = createLogManager(zkUtils.zkClient, brokerState)
  4.         logManager.startup()
  5.    
  6.         replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
  7.           isShuttingDown, quotaManagers.follower)
  8.         replicaManager.startup()
  9.    
  10.         //...
  11.     }
复制代码
我们先来看下LogManager的初始化,内部就是启动了一些调度使命,包括:日志清理、日志刷磁盘、更新日志检查点等等:
  1.     // LogManager.scala
  2.    
  3.     def startup() {
  4.       if(scheduler != null) {
  5.         // 日志清理任务
  6.         info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
  7.         scheduler.schedule("kafka-log-retention",
  8.                            cleanupLogs,
  9.                            delay = InitialTaskDelayMs,
  10.                            period = retentionCheckMs,
  11.                            TimeUnit.MILLISECONDS)
  12.         // 日志刷磁盘任务
  13.         info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
  14.         scheduler.schedule("kafka-log-flusher",
  15.                            flushDirtyLogs,
  16.                            delay = InitialTaskDelayMs,
  17.                            period = flushCheckMs,
  18.                            TimeUnit.MILLISECONDS)
  19.         // 更新日志检查点任务
  20.         scheduler.schedule("kafka-recovery-point-checkpoint",
  21.                            checkpointRecoveryPointOffsets,
  22.                            delay = InitialTaskDelayMs,
  23.                            period = flushCheckpointMs,
  24.                            TimeUnit.MILLISECONDS)
  25.         // 删除日志任务
  26.         scheduler.schedule("kafka-delete-logs",
  27.                            deleteLogs,
  28.                            delay = InitialTaskDelayMs,
  29.                            period = defaultConfig.fileDeleteDelayMs,
  30.                            TimeUnit.MILLISECONDS)
  31.       }
  32.       if(cleanerConfig.enableCleaner)
  33.         cleaner.startup()
  34.     }
复制代码
再来看下ReplicaManager的初始化,它启动了两个调度使命,都是和ISR相关的,一个是清理落后太多的ISR副本,一个是将最新的ISR结果进行传播:
  1.     // ReplicaManager.scala
  2.    
  3.     def startup() {
  4.       // 清理ISR中落后的Follower副本
  5.       scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2,
  6.                          unit = TimeUnit.MILLISECONDS)
  7.       // 传播最新的ISR副本
  8.       scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L,
  9.                          unit = TimeUnit.MILLISECONDS)
  10.     }
复制代码

三、团体流程

回忆一下,Kafka网络层(Network Layer)在担当到消息后,最终会转交给Kafka API层处理惩罚,而API层内部封装了ReplicaManager,在处理惩罚请求时,会委托ReplicaManager完成消息的写入:
  1.     // KafkaApis.scala
  2.    
  3.     def handleProducerRequest(request: RequestChannel.Request) {
  4.       val produceRequest = request.body.asInstanceOf[ProduceRequest]
  5.       //...
  6.       if (authorizedRequestInfo.isEmpty)
  7.         sendResponseCallback(Map.empty)
  8.       else {
  9.         val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
  10.    
  11.         // 委托ReplicaManager将消息写入分区的各个副本
  12.         replicaManager.appendRecords(
  13.           produceRequest.timeout.toLong,
  14.           produceRequest.acks,
  15.           internalTopicsAllowed,
  16.           authorizedRequestInfo,
  17.           sendResponseCallback)
  18.    
  19.         produceRequest.clearPartitionRecords()
  20.       }
  21.     }
复制代码
我用下面这两张流程图表示,忽略掉了许多非核心细节:



3.1 ReplicaManager

找到了消息写入的入口——ReplicaManager.appendRecords,我们来分析下消息写入的团体流程。ReplicaManager会委托LogManager完成消息的磁盘长期化:
  1.     // ReplicaManager.scala
  2.    
  3.     def appendRecords(timeout: Long,
  4.                       requiredAcks: Short,
  5.                       internalTopicsAllowed: Boolean,
  6.                       entriesPerPartition: Map[TopicPartition, MemoryRecords],
  7.                       responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
  8.    
  9.       // 判断是否为有效的ACK参数
  10.       if (isValidRequiredAcks(requiredAcks)) {
  11.         val sTime = time.milliseconds
  12.         // 关键:将消息写入当前Broker的Leader副本
  13.         val localProduceResults = appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks)
  14.    
  15.         debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
  16.         val produceStatus = localProduceResults.map { case (topicPartition, result) =>
  17.           topicPartition ->
  18.                   ProducePartitionStatus(
  19.                     result.info.lastOffset + 1, // required offset
  20.                     new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime))
  21.         }
  22.    
  23.         // 如果是延迟消息
  24.         if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
  25.           val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
  26.           val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
  27.    
  28.           val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
  29.           delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
  30.         } else {    // 如果是普通消息
  31.           // 正常调用响应回调方法
  32.           val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
  33.           responseCallback(produceResponseStatus)
  34.         }
  35.       } else {    // 如果是无效ACK参数,返回异常
  36.         // 异常状态码
  37.         val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
  38.           topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
  39.           LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP)
  40.         }
  41.         // 调用回调方法
  42.         responseCallback(responseStatus)
  43.       }
  44.     }
复制代码
上述代码最核心的部分是调用了appendToLocalLog,内部通过调用Partition.appendRecordsToLeader方法,往指定的分区写入消息日志:
  1.     // ReplicaManager.scala
  2.    
  3.     private def appendToLocalLog(internalTopicsAllowed: Boolean,
  4.                                  entriesPerPartition: Map[TopicPartition, MemoryRecords],
  5.                                  requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
  6.       trace("Append [%s] to local log ".format(entriesPerPartition))
  7.       entriesPerPartition.map { case (topicPartition, records) =>
  8.         BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark()
  9.         BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
  10.    
  11.         // 如果是内部主题,且Broker参数配置不允许写入内部主题,则直接报错
  12.         if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
  13.           (topicPartition, LogAppendResult(
  14.             LogAppendInfo.UnknownLogAppendInfo,
  15.             Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
  16.         } else {
  17.           // 正常流程
  18.           try {
  19.             // 1.获取分区
  20.             val partitionOpt = getPartition(topicPartition)
  21.             val info = partitionOpt match {
  22.               case Some(partition) =>
  23.                 // 2.写入消息
  24.                 partition.appendRecordsToLeader(records, requiredAcks)
  25.               case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
  26.                 .format(topicPartition, localBrokerId))
  27.             }
  28.    
  29.             val numAppendedMessages =
  30.               if (info.firstOffset == -1L || info.lastOffset == -1L)
  31.                 0
  32.               else
  33.                 info.lastOffset - info.firstOffset + 1
  34.    
  35.             //...
  36.           } catch {
  37.             //...
  38.           }
  39.         }
  40.       }
  41.     }
复制代码

3.2 Partition

来看下Partition.appendRecordsToLeader()方法,它的内部又调用了Log.append()方法写入消息:
  1.     // Partition.scala
  2.    
  3.     def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = {
  4.       val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
  5.         leaderReplicaIfLocal match {
  6.           case Some(leaderReplica) =>
  7.             val log = leaderReplica.log.get
  8.             val minIsr = log.config.minInSyncReplicas
  9.             val inSyncSize = inSyncReplicas.size
  10.    
  11.             // 确保ISR副本数符合写入要求
  12.             if (inSyncSize < minIsr && requiredAcks == -1) {
  13.               throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"
  14.                 .format(topicPartition, inSyncSize, minIsr))
  15.             }
  16.    
  17.             // 调用Log.append方法完成消息日志的写入
  18.             val info = log.append(records, assignOffsets = true)
  19.             replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
  20.             // we may need to increment high watermark since ISR could be down to 1
  21.             (info, maybeIncrementLeaderHW(leaderReplica))
  22.    
  23.           case None =>
  24.             throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
  25.               .format(topicPartition, localBrokerId))
  26.         }
  27.       }
  28.       //...
  29.     }
复制代码

3.3 Log

Log是Log Subsystem中最核心的一个类,它的append方法调用LogSegment.append()完成消息的写入:
  1.     // Log.scala
  2.    
  3.     def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
  4.       //...
  5.    
  6.       try {
  7.         lock synchronized {
  8.           // ...
  9.    
  10.           // 判断是否需要新增分段日志,并返回最新的一个分段日志
  11.           val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
  12.             maxTimestampInMessages = appendInfo.maxTimestamp,
  13.             maxOffsetInMessages = appendInfo.lastOffset)
  14.    
  15.           // 写入日志
  16.           segment.append(firstOffset = appendInfo.firstOffset,
  17.             largestOffset = appendInfo.lastOffset,
  18.             largestTimestamp = appendInfo.maxTimestamp,
  19.             shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
  20.             records = validRecords)
  21.    
  22.           // 增加LEO
  23.           updateLogEndOffset(appendInfo.lastOffset + 1)
  24.    
  25.           // 刷磁盘
  26.           if (unflushedMessages >= config.flushInterval)
  27.             flush()
  28.    
  29.           appendInfo
  30.         }
  31.       } catch {
  32.         case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
  33.       }
  34.     }
复制代码

3.4 LogSegment

LogSegment的append方法,内部又调用了FileRecords.append()完成消息写入:
  1.     // LogSegment.scala
  2.    
  3.     def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords) {
  4.       if (records.sizeInBytes > 0) {
  5.         // 物理位置
  6.         val physicalPosition = log.sizeInBytes()
  7.         if (physicalPosition == 0)
  8.           rollingBasedTimestamp = Some(largestTimestamp)
  9.         // append the messages
  10.         require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
  11.         // 调用FileRecords.append完成消息写入
  12.         val appendedBytes = log.append(records)
  13.    
  14.         if (largestTimestamp > maxTimestampSoFar) {
  15.           maxTimestampSoFar = largestTimestamp
  16.           offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
  17.         }
  18.         // 写入索引
  19.         if(bytesSinceLastIndexEntry > indexIntervalBytes) {
  20.           index.append(firstOffset, physicalPosition)
  21.           timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
  22.           bytesSinceLastIndexEntry = 0
  23.         }
  24.         bytesSinceLastIndexEntry += records.sizeInBytes
  25.       }
  26.     }
复制代码
最后看下FileRecords.append():
  1.     // FileRecords.java
  2.    
  3.     public int append(MemoryRecords records) throws IOException {
  4.         int written = records.writeFullyTo(channel);
  5.         size.getAndAdd(written);
  6.         return written;
  7.     }
复制代码

四、总结

本章,我带大家回顾了Kafka的日志结构,然后对Log Subsystem的核心组件进行了团体分析,并找到了日志写入的入口,分析了日志写入的团体流程。下一章开始,我将逐一分析日志写入过程中涉及的各个核心组件。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曹旭辉

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表