手记

消息持久化存储源码解析

一、原理

1、消息存在哪了?

消息持久化的地方其实是磁盘上,在如下目录里的commitlog文件夹里。

/root/store/commitlog

源码如下:

// {@link org.apache.rocketmq.store.config.MessageStoreConfig}

// 数据存储根目录
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
// commitlog目录
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog";
// 每个commitlog文件大小为1GB,超过1GB则创建新的commitlog文件
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;

比如验证下:

[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]# pwd
/root/store/commitlog
[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]# ll -h
total 400K
-rw-r--r-- 1 root root 1.0G Jun 30 18:21 00000000000000000000
[root@iZ2ze84zygpzjw5bfcmh2hZ commitlog]#

可以清晰的看到文件大小是1.0G,超过1.0G再写入消息的话会自动创建新的commitlog文件。

2、关键类解释

2.1、MappedFile

对应的是commitlog文件,比如上面的00000000000000000000文件。

2.2、MappedFileQueue

MappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列。

2.3、CommitLog

针对 MappedFileQueue 的封装使用。

二、Broker接收消息

1、调用链

BrokerStartup.start() -》 BrokerController.start() -》 NettyRemotingServer.start() -》 NettyRemotingServer.prepareSharableHandlers() -》 new NettyServerHandler() -》 NettyRemotingAbstract.processMessageReceived() 
-》 NettyRemotingAbstract.processRequestCommand() -》 SendMessageProcessor.processRequest()

2、processRequest

SendMessageProcessor.processRequest()

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
    RemotingCommand response = null;
    try {
        // 调用asyncProcessRequest
        response = asyncProcessRequest(ctx, request).get();
    } catch (InterruptedException | ExecutionException e) {
        log.error("process SendMessage error, request : " + request.toString(), e);
    }
    return response;
}


0人推荐
随时随地看视频
慕课网APP