Kafka服务端日记详解

打印 上一主题 下一主题

主题 1015|帖子 1015|积分 3045

服务端日记

Kafka的日记信息是通过conf/server.properties文件中的log.dirs设置项来设置的




Topic消息存储方式

主体介绍

进入到上方设置文件中指定的目次下检察,topic的数据都是以topic名 + partition下标的命名方式生存的




我们如今进入其中一个partition目次






  • .index
    日记索引文件,采用的稀疏索引进步查询服从,纪录的是消息偏移量offset 和 该消息在.log文件中的位置position
  • .log
    消息生存在.log文件中,是以二进制的方式生存的。可以通过.index和.timeindex两个索引文件加快查找消息
    文件大小是(log.segment.bytes参数设定)默认1GB,新文件名是第一条消息的offset.log
  • .timeindex
    日记索引文件,采用的也是稀疏索引结构,每隔一段时间生存一条索引纪录,纪录的是消息产生的时间戳timestamp和消息偏移量offset
  • .snapshot
    快照,可以理解为一个备份文件
  • leader-epoch-checkpoint
    Leader partition新上任就会往该文件中写入一个epoch,来保证HW的一致性
  • partition.metadata
    生存着该partition对应的topic_id



最后两个文件可以直接检察,就纪录的一些简单的信息
  1. [root@worker1 disTopic-0]# cat leader-epoch-checkpoint
  2. 0
  3. 3
  4. 5 0
  5. 9 6
  6. 14 18
  7. [root@worker1 disTopic-0]# cat partition.metadata
  8. version: 0
  9. topic_id: rDUdZBO7RH2GNPgdRXk7Tw
复制代码



而前三个文件是以二进制的方式生存的,需要通过Kafka提供的kafka-dump-log.sh来检察文件内容,如下所示
  1. # 保存的是消息产生的时间戳 和 消息offset
  2. [root@worker3 testTopic-0]# kafka-dump-log.sh --files 00000000000000000000.timeindex
  3. Dumping 00000000000000000000.timeindex
  4. timestamp: 1723254597947 offset: 51
  5. timestamp: 1723254598224 offset: 102
  6. timestamp: 1723254598501 offset: 152
  7. timestamp: 1723254598816 offset: 201
  8. timestamp: 1723254599085 offset: 250
  9. # 保存是的消息的offset 和 该消息在.log文件中对应的position位置
  10. [root@worker3 testTopic-0]# kafka-dump-log.sh --files 00000000000000000000.index
  11. Dumping 00000000000000000000.index
  12. offset: 51 position: 4160
  13. offset: 102 position: 8324
  14. offset: 152 position: 12428
  15. offset: 201 position: 16544
  16. offset: 250 position: 20660
  17. offset: 299 position: 24776
  18. # 每一条记录保存的是一批消息信息,只不过我下面刚好都只保存一条消息
  19. [root@worker3 testTopic-0]# kafka-dump-log.sh --files 00000000000000000000.log
  20. baseOffset: 50 lastOffset: 50 count: 1 baseSequence: 0 lastSequence: 0 producerId: 5001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4080 CreateTime: 1723254597907 size: 80 magic: 2 compresscodec: none crc: 672861010 isvalid: true
  21. baseOffset: 51 lastOffset: 51 count: 1 baseSequence: 1 lastSequence: 1 producerId: 5001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4160 CreateTime: 1723254597947 size: 80 magic: 2 compresscodec: none crc: 3136762717 isvalid: true
  22. baseOffset: 52 lastOffset: 52 count: 1 baseSequence: 2 lastSequence: 2 producerId: 5001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4240 CreateTime: 1723254597951 size: 80 magic: 2 compresscodec: none crc: 3076149845 isvalid: true
复制代码



log文件追加纪录消息



  • .log文件是以追加的方式写入新的消息日记。position表现写入位置,size表现消息的总长度,通过这两个值就能从一段二进制中获取到一条具体的消息。
  • Kafka中的消息日记,只允许追加,不允许删除和修改。
  • log文件的固定大小是log.segment.bytes参数设定,默认1GB。新创建的文件是以写入第一条消息的offset作为的文件名



index和timeindex索引文件

index和timeindex索引文件目的都是为了加快从log文件中读取消息的服从,如下图所示,






  • index和timeindex文件是以相对偏移量的方式建立的log消息日记数据索引
    比如说0000.index和 0947.index索引文件中的内容offset都是以0开始计数的,使用的是第一条消息的相对偏移量,而消息绝对偏移量=文件名+相对偏移量
  • index和timeindex文件采用的是类似于数据的跳表,并不是每一条消息都会纪录一条索引。
    由log.index.interval.bytes决定.log文件中产生多少大小的消息就天生一条index纪录 官网 服务端的参数说明
    1. log.index.interval.bytes
    2. The interval with which we add an entry to the offset index
    3. Type:        int
    4. Default:        4096 (4 kibibytes)
    复制代码



index文件的作用类似于数据结构中的跳表,他的作用是用来加快查询log文件的服从。而timeindex文件的作用则是用来进行一些跟时间相干的消息处理处罚。比如文件清算。



日记文件清算

Kafka为了防止过多的日记文件给服务器带来过大的压力,他会定期删除过期的log文件。



判定那些日记文件过期了


  • log.retention.check.interval.ms
    定时检测文件是否过期。默认是 300000毫秒,也就是五分钟。 在检查文件是否超时时,是以每个.timeindex中最大的那一条纪录为准。
  • log.retention.hours , log.retention.minutes, log.retention.ms 。
    这一组参数表现文件保存多长时间。默认见效的是log.retention.hours,默认值是168小时,也就是7天。如果设置了更高的时间精度,以时间精度最高的设置为准。
官网 服务端的参数说明
  1. # 日志清除程序检查是否有日志符合删除条件的频率(以毫秒为单位)
  2. log.retention.check.interval.ms
  3. Type:        long
  4. Default:        300000 (5 minutes)
  5. # 在删除日志文件之前保留它的小时数(以小时为单位),仅次于log.retention.ms属性
  6. log.retention.hours
  7. Type:        int
  8. Default:        168
  9. # 在删除日志文件之前保留日志文件的分钟数(以分钟为单位),仅次于log.retention.ms属性。如果没有设置,则使用log.retention.hours中的值
  10. log.retention.minutes
  11. Type:        int
  12. Default:        null
  13. # 日志文件删除前保留的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes中的值。如果设置为-1,则不应用时间限制。
  14. log.retention.ms
  15. Type:        long
  16. Default:        null
复制代码



过期的日记文件如何处理处罚
  1. # 日志清理策略。有两个选项,delete表示删除日志文件。 compact表示压缩日志文件。
  2. log.cleanup.policy
  3. Type:        list
  4. Default:        delete
  5. Valid Values:        [compact, delete]
  6. # 日志删除前的最大容量
  7. # 当log.cleanup.policy选择delete时  当总的日志文件大小超过这个阈值后,就会删除最早的日志文件。默认是-1,表示无限大。
  8. log.retention.bytes
  9. Type:        long
  10. Default:        -1
复制代码



Kafka的文件高效读写机制

Kafka的文件结构

kafka的数据文件结构可以加快日记文件的读取。
Topic下的多个partition采用的是单独纪录日记文件,如许加快了topic下的数据读取
通过.index索引文件的稀疏索引结构,进一步加快日记检索速率。



次序写磁盘

对每个Log文件,Kafka会提前规划固定的大小,如许在申请文件时,可以提前占据一块连续的磁盘空间。
Kafka的log文件只能以追加的方式往文件的末了添加(这种写入方式称为次序写)



零拷贝

零拷贝是Linux操作系统提供的一种IO优化机制,而Kafka大量的运用了零拷贝机制来加快文件读写。
零拷贝就是配合内核态的复制机制,减少用户态和内核态之间的内容拷贝
传统的一次硬件IO是如许工作的。如下图所示:




零拷贝紧张有两种实现机制
1、mmap文件映射机制
不再将整个文件复制进用户态,而是用户态只持有一个文件的映射信息,通过这个映射信息控制内核态的文件读写。
java中大量使用该方式 ,可以参考下JDK中的DirectByteBuffer实现机制
适用于文件不凌驾2G的文件,以是Kafka将日记文件设计成1G




2、sendfile文件传输机制
用户态连文件索引都不读取,直接向内核态发送一个sendfile指令,让内核态去进行文件拷贝
例如当Consumer要从Broker上poll消息时,Broker不需要对消息进行任何的加工,用户态就只需要往内核态发一个sendfile指令,而不需要有任何的数据拷贝过程。Kafka大量的使用了sendfile机制,用来加快对当地数据文件的读取过程。

   JDK中8中java.nio.channels.FileChannel类提供了transferTo和transferFrom方法,底层就是使用了操作系统的sendfile机制。
  


合理设置刷盘频率

应用程序读取文件是从内核态的pageCache中读取的,生存文件是终极只能生存到pageCache中,应用程序不能直接操作pageCache。而pageCache中的数据如果还没有来得及刷盘持久化到磁盘,服务器突然非正常断电,那么pageCache中的数据就会丢失。
应用程序唯一能做的就是频繁的调用OS提供的fsync()关照OS进行刷盘操作,但是则会降低应用的执行性能。以是应用程序需要在数据安全和高性能上做弃取。
Kafka在服务端设计了几个参数,来控制刷盘的频率:
这里可以看到,Kafka为了最大化性能,默认是将刷盘操作交由了操作系统进行统一管理。
  1. # 多长时间进行一次强制刷盘。默认是Long.MAX。
  2. flush.ms
  3. Type:        long
  4. Default:        9223372036854775807
  5. Valid Values:        [0,...]
  6. # 表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。
  7. log.flush.interval.messages
  8. Type:        long
  9. Default:        9223372036854775807
  10. Valid Values:        [1,...]
  11. #当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。如果这个参数配置为空,则生效的是下一个参数。
  12. log.flush.interval.ms
  13. Type:        long
  14. Default:        null
  15. # 日志刷新程序检查是否需要将日志刷新到磁盘的频率(以毫秒为单位),默认也是Long.MAX。
  16. log.flush.scheduler.interval.ms
  17. Type:        long
  18. Default:        9223372036854775807
复制代码



客户端消耗进度管理

消耗者消耗消息的进度被生存在一个名称为__consumer_offsets内置的topic中,该topic默认会创建50个分区partition




该topic在zookeeper也能检察到相应的信息,只不过zookeeper上只是简单纪录了partition的Leader和ISR列表,并没有看见真实消耗者的消耗进度




既然这也是一个topic下的partition,我们启动一个消耗者来消耗其中的消息看看
可以看到下面纪录的消息内容就是一个key-value的格式,key为消耗者组+topic+partition 而value生存则offset和一些元数据信息
也就是说这里纪录了消耗者组在某个topic下的partition的消息消耗偏移量offset
  1. [root@worker1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
  2. [test,disTopic,0]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[5], metadata=, commitTimestamp=1723081907174, expireTimestamp=None)
  3. [test,disTopic,3]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[7], metadata=, commitTimestamp=1723081907275, expireTimestamp=None)
  4. [test,disTopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[1], metadata=, commitTimestamp=1723081907275, expireTimestamp=None)
  5. [test,disTopic,2]::OffsetAndMetadata(offset=11, leaderEpoch=Optional[4], metadata=, commitTimestamp=1723081907275, expireTimestamp=None)
  6. [test,disTopic,0]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[5], metadata=, commitTimestamp=1723081907275, expireTimestamp=None)
  7. [test,disTopic,3]::OffsetAndMetadata(offset=6, leaderEpoch=Optional[7], metadata=, commitTimestamp=1723081907377, expireTimestamp=None)
  8. [test,disTopic,1]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[1], metadata=, commitTimestamp=1723081907377, expireTimestamp=None)
复制代码



而这些Offset数据,实在也是可以被消耗者修改的,在之前章节已经演示过消耗者如何从指定的位置开始消耗消息。而一旦消耗者主动调整了Offset,Kafka当中也会更新对应的纪录。
另外,这个系统Topic里面的数据是非常紧张的,因此Kafka在消耗者端也设计了一个参数来控制这个Topic应该从订阅关系中剔除。
  1. public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";
  2. private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether internal topics matching a subscribed pattern should " +
  3.     "be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";
  4. public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天空闲话

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表