手记

无镜--kafka之存储(二)

日志管理

消息代理节点的数据目录(log.dirs)可以设置多个目录,代理节点负责的所有分区分布在多个目录中。一个数据目录下可以有多个分区,一个分区只对应一个日志目录(logDir),同一个分区只会在其中的一个数据目录中,不会同时存在多个数据目录中。

kafka服务启动时会创建一个日志管理类,日志管理类采用线程池方式加载所有的日志,为每个日志的加载都创建一个单独的线程,每个日志会加载所有的日志分段。加载日志的任务本身是阻塞的。加载完毕之后,会调用startup()方法,加载四个管理线程:定时将所有数据目录所有日志的检查点写到检查点文件中;定时刷新还没有写到磁盘上的日志;定时清理失效的日志分段,并维护日志的大小;定时压缩日志,相同键不同值的消息只保存最近一条;

检查点文件

消息代理节点用多个数据目录存储所有的分区日志,每个数据目录都有一个全局的检查点文件,检查点文件会存储这个数据目录下所有日志的检查点信息。检查点是日志已经刷新到磁盘的位置,在分布式存储系统中主要用于故障的恢复(这个检查点文件就是恢复点检查点文件)。

kafka启动创建日志管理类,读取检查点文件,并把每个分区对应的检查点作为日志的恢复点,最后创建分区对应的日志实例。

消息追加到分区对应的日志后,最后在刷新日志时,将最新的偏移量作为日志的检查点。

日志管理启动一个定时任务:读取所有日志的检查点,并写入全局的检查点文件。

刷新日志

日志在没有刷写之前,数据保存在操作系统的页面缓存中,日志管理器启动时会定时调度方法,定期将页面缓存中的数据真正的写到磁盘的文件中。这样操作比直接就把数据写到文件中要快很多,但是也会存在风险,在数据未写入磁盘文件之前,节点宕掉了就会导致数据丢失(有副本机制,风险降低很多)。

刷盘的策略有两种:时间和大小。

时间:日志管理器启动一个定时器,每隔log.flush.interval.ms的时间执行一次刷写动作。

大小:仅有当有新消息产生时才有可能有机会调用刷写日志:追加信息到日志时,如果新创建了日志分段,立即刷新旧的日志分段;日志中未刷新的消息数量超过log.flush.interval.messages配置的值,立即执行一次刷写动作。

刷新日志方法的参数是日志的最新偏移量(logEndOffset),它要和日志中现有的检查点位置进行比较,只有最新偏移量比检查点位置大,才需要刷新。由于一个日志有多个日志分段,所以刷新日志时,会刷新从检查点位置到最新偏移量的所有日志分段,最后更新检查点位置。

清理日志

为了控制日志中所有的日志分段的总大小不超过阀值(log.retention.bytes),日志管理器定期清理旧的日志分段,从最旧的日志分段开始清理。有两种策略:删除和压缩。

删除:根据时间或者大小直接物理删除整个日志分段。

压缩:不直接删除日志分段,针对每个键采用合并压缩的方式。

策略可以设置为全局的,也可以针对主题进行单独设置,如果消息没有键,那只能按照删除策略设置

删除日志的思路是:将当前最新的日志大小减去下一个即将删除的日志分段大小,如果结果超过阔值,则允许删除下一个日志分段;如果小于阔值,则不会删除下一个日志分段 。

删除日志分段是一个异步操作,在执行异步删除之前,要先将日志分段从映射表中删除,再将日志分段中的数据文件和索引文件添加上.deleted后缀,最后才调度异步删除日志分段的任务。

日志实例中涉及日志分段的修改,比如追加消息(append() )、删除(delete())、滚动创建( roll())、刷新(flush() )、截断(truncateTo())、替换(replaceSegments())、关闭(close() )、 加 载( loadSegments ())等操作,都会用一个对象锁保证同步。

日志压缩

kafka是用追加的形式把消息集写到文件中的,如果消息集带有键,是不会去查询键是否存在的,因为太耗性能,这样写操作性能就很高,后期后台线程定期对相同的键进行合并,保留偏移量大的消息。压缩的工作就是:删除旧的,只保留最近的一次。

日志压缩的原则是不影响写操作,所以除了当前活动的日志分段外,其余剩下的日志分段全部参与压缩合并。因为每个文件目录是一个分区,不同分区有不同的文件目录,每个分区下的文件目录下的日志分段不会太多,所以一次性合并所有的旧的日志分段问题不大。

清理点

日志压缩会将旧日志分段的消息,复制到新日志分段上。为了减少复制过程中的内存开销,在开始压缩之前,将日志按照清理点分成日志尾部和日志头部。每完成一次压缩后,清理点就更新为压缩之前的那个日志分段的基准偏移量。

日志头部的范围:从没有清理的偏移量开始到活动日志分段的基准偏移量。

日志尾部的范围:从开始清理的偏移量到清理点。

例如:

第一次日志压缩,清理点为0。这时的头部就是0到活动日志的基准偏移量13,没有尾部。

第一次日志压缩之后,清理点更新为压缩之前的活动日志的基准偏移量13。

第二次日志压缩,清理点为13。这时的头部是13到现在活动的日志分段基准偏移量20,尾部是0到13。

第二次日志压缩之后,清理更新为第二次压缩之前的活动的日志分段基准偏移量20。注意这里第二次压缩之后,日志尾部中消息如果有键重复的,就已经合并了。

第三次日志压缩,清理点为20,这时的头部是20到现在活动的日志分段基准偏移量36,尾部是2到20。

以此类推。说明:第三次的时候日志尾部为什么是从偏移量2开始的?因为有可能在第二次压缩的时候,偏移量0的消息的键和后面偏移量为2的消息的键重复了,就合并了,保留了最近的更新。删除了偏移量为0的消息。由此可以看出,日志尾部的偏移量是稀疏的,但总体上是递增的。被保留的消息即使复制到了新的日志分段,也不会改变消息的偏移量,不会对消息进行重新排序;日志压缩后消息的物理位置会发生变化,因为复制到了新的日志分段是使用新的数据文件。

可以看出日志尾部是清理过的,日志头部是从未清理过的。

在日志压缩合并的同时,客户端也会读取日志,如果客户端读取的进度每次都赶在压缩之后还在读日志尾部的消息,那么客户端就不会读取完整的消息;如果客户端读取的进度每次读取的是日志头部的消息,那么客户端就会读取完整的消息。(这里说的完整的消息是指:新旧的消息一起都会读取到,不是指消息本身的完整)

删除点

如果一个消息带有键,但是内容为null,表示这条删除消息所在偏移量之前的所有消息都需要删除。这个删除消息叫做删除点。因为每条消息都会有多个副本,所以删除点除了追加到主副本上日志分段上,也需要复制并保持到其他节点的备份副本上。

日志压缩会将上一次压缩后的多个小文件合并为一组,压缩成新的文件。日志压缩后,新的日志分段不会更改每条消息的偏移量,也不会更改文件的最近修改时间。

在第二次压缩之后,就需要考虑删除点是否需要保留,因为压缩之后的消息是被重组的。那么原来在连续偏移量下增加的删除点是否还需要保留。保留的话必须满足以下条件:日志分段的最近修改时间大于deleteHorizoMs。deleteHorizoMs的计算方式是:日志头部起始位置前的最后一个日志分段的最近修改时间减去保留阀值(delete.retention.ms)。只要大于就删除日志分段中的删除点。

日志清理的管理器与清理线程

日志管理器(LogManager)除了管理日志的常用操作,也管理一个日志清理器(LogCleaner),日志清理器通过管理器(LogCleanerManager)选择出需要清理的日志(LogToClean),并将具体的清理动作交给清理线程(CleanerThread)完成。

来之《 Kafka技术内幕:图文详解Kafka源码设计与实现 》: 日志清理器的线程模型

日志管理需要把数据目录(logDirs)和所有的日志(logs)作为参数,传递给日志清理管理器(LogCleanerManager)。每个数据目录都有一个清理点检查点文件,来记录每个日志的最近一次清理点位置。日志清理管理器在选择日志时,会读取每个日志的清理点,然后选择最需要清理的日志进行清理,清理完成后将这个日志的最新清理点写入清理点检查点文件中。

清理线程(CleanerThread)每次运行,都会让日志清理管理器(LogCleanerManager)选择一个最需要清理的日志;清理线程对应的清理者(Cleaner)每次也只会清理一个日志。每个分区的日志都对应一个LogToClean对应。

什么情况下才是最需要清理的日志喃:日志头部大小除以日志的大小(日志头部加上日志尾部),然后选择比率最大的那个分区对应的日志。

日志压缩的具体步骤:

1,消息追加到活动的日志分段,选择活动日志分段之前的所有日志分段参与日志压缩。

2,为日志头部构建一张消息键到偏移量的映射表,相同键但偏移量低于映射表的消息会被删除。

3,通过复制消息的方式,将需要保持的消息复制到新的日志分段,每条键都只有一条最新的消息。

4,复制完成后,新的日志分段会代替所有参与压缩操作的旧日志分段。

5,更新日志的清理点,为下一个日志压缩做准备,清理点将日志分为日志头部和日志尾部。

每一次的日志压缩都是用日志头部里面的消息键和日志尾部中的消息键进行比较,如果日志尾部中的消息键在日志头部中存在,那么日志尾部中的消息就需要被删除掉。



作者:吉之无镜
链接:https://www.jianshu.com/p/f8c24b3b4f1a


0人推荐
随时随地看视频
慕课网APP