Jafka源代码分析——LogManager




Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log)。通过阅读源代码可知其具体完成的功能如下:

1. 按照预设规则对消息队列进行清理。

2. 按照预设规则对消息队列进行持久化(flush操作)。

3. 连接ZooKeeper进行brokertopicpartition相关的ZooKeeper操作。

4. 管理broker上所有的Log

下面一一对这些功能的实现进行详细的解析。

一、对于Log的管理

LogManager包含成员变量logslogskeytopicvaluePool<Integer,Log>(该value又是一个Map,主键是partitionvalue是该partition所对应的Log)。因此LogManager通过logs保存该broker上所有的消息队列。

private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, 	Pool<Integer, Log>>();

LogManager在初始化之后,需要根据配置文件配置的消息队列根目录进行遍历。通过遍历,查找并生成Log。该遍历的具体实现在方法load中:

① 获取消息队列根目录下的所有文件

② 对于根目录下的每一个文件进行如下操作

 1.如果是目录,则有可能是一个Log,否则不是并忽略

 2.对于通过1的目录分析其文件名,目录的文件名由两部分组成:topic-partition

 3.对于通过2的目录,用目录、解析出的topic、解析出的partition生成Log

 4.3生成的Log放入logs日志池

 5.最后,判断目录解析的partition与配置文件中配置的partition的大小,如果配置文件较小,则更新配置


二、消息队列清理

消息队列的清理由Scheduler周期性的调用,具体的调用在load函数中,主要的删除实现在cleanLogs函数中。消息队列的清理分为两种情况:一种是超过预设的时间则删除,二是超过预设的大小则删除,分别对应两个函数cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一种情况比较简单,因为每一个segment对应一个文件,通过对比文件的lastModifiedTime和系统的现在时间来确定其是否超时,如果超时则删除。对于第二种情况,首先比较Log的大小与配置的大小。如果小于配置的大小则不删除;如果大于了配置的大小,则计算超过配置大小的长度(定为差值);然后将小于该差值的segment删除(这地方有点疑惑,这样删除会不会把一些最新的消息队列给删除了)。

 if (this.scheduler != null) {
 	this.scheduler.scheduleWithRate(new Runnable() {
                public void run() {
                    try {
                        cleanupLogs();
                    } catch (IOException e) {
                        logger.error("cleanup log failed.", e);
                    }
                }
            }, 60 * 1000, logCleanupIntervalMs);
}


三、对于消息队列的持久化

对消息队列的flush操作同样由单独的线程来完成。该线程通过比较Log上一次的flush时间和当前的系统时间来确定是否需要flush,如果需要则持久化到文件。注意,消息的队列的持久化在新增消息的时候也会判断,如果一个Log保存的新增消息的条数超过了预设值则进行flush操作。


Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log)。通过阅读源代码可知其具体完成的功能如下:

1. 按照预设规则对消息队列进行清理。

2. 按照预设规则对消息队列进行持久化(flush操作)。

3. 连接ZooKeeper进行brokertopicpartition相关的ZooKeeper操作。

4. 管理broker上所有的Log

下面一一对这些功能的实现进行详细的解析。

一、对于Log的管理

LogManager包含成员变量logslogskeytopicvaluePool<Integer,Log>(该value又是一个Map,主键是partitionvalue是该partition所对应的Log)。因此LogManager通过logs保存该broker上所有的消息队列。

private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, 	Pool<Integer, Log>>();

LogManager在初始化之后,需要根据配置文件配置的消息队列根目录进行遍历。通过遍历,查找并生成Log。该遍历的具体实现在方法load中:

① 获取消息队列根目录下的所有文件

② 对于根目录下的每一个文件进行如下操作

 1.如果是目录,则有可能是一个Log,否则不是并忽略

 2.对于通过1的目录分析其文件名,目录的文件名由两部分组成:topic-partition

 3.对于通过2的目录,用目录、解析出的topic、解析出的partition生成Log

 4.3生成的Log放入logs日志池

 5.最后,判断目录解析的partition与配置文件中配置的partition的大小,如果配置文件较小,则更新配置


二、消息队列清理

消息队列的清理由Scheduler周期性的调用,具体的调用在load函数中,主要的删除实现在cleanLogs函数中。消息队列的清理分为两种情况:一种是超过预设的时间则删除,二是超过预设的大小则删除,分别对应两个函数cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一种情况比较简单,因为每一个segment对应一个文件,通过对比文件的lastModifiedTime和系统的现在时间来确定其是否超时,如果超时则删除。对于第二种情况,首先比较Log的大小与配置的大小。如果小于配置的大小则不删除;如果大于了配置的大小,则计算超过配置大小的长度(定为差值);然后将小于该差值的segment删除(这地方有点疑惑,这样删除会不会把一些最新的消息队列给删除了)。

 if (this.scheduler != null) {
 	this.scheduler.scheduleWithRate(new Runnable() {
                public void run() {
                    try {
                        cleanupLogs();
                    } catch (IOException e) {
                        logger.error("cleanup log failed.", e);
                    }
                }
            }, 60 * 1000, logCleanupIntervalMs);
}


三、对于消息队列的持久化

对消息队列的flush操作同样由单独的线程来完成。该线程通过比较Log上一次的flush时间和当前的系统时间来确定是否需要flush,如果需要则持久化到文件。注意,消息的队列的持久化在新增消息的时候也会判断,如果一个Log保存的新增消息的条数超过了预设值则进行flush操作。


相关内容