消息文件存储结构
与 Kafka 类似,RocketMQ 选择直接操作文件系统来提升存储效率,不同的是,RocketMQ 将消息持久化过程最大化的转化为顺序写。 RocketMQ 默认存储路径为 $HOME/store,相关文件目录结构如下。
Kafka 以 Topic 作为文件存储的基本单元,即每个 Topic 有其对应的数据文件和索引文件。当存在大量 Topic 时,消息持久化逐渐变成一种随机写磁盘的行为,此时磁盘 IO 成为影响系统吞吐量的主要因素。针对上述问题,RocketMQ 首先将消息的写入转化为顺序写
,即所有 Topic 的消息均写入同一个文件(CommitLog
)。同时,由于消息仍需要以 Topic 为维度进行消费,因此 RocketMQ 基于 CommitLog 为每个 Topic 异步构建多个逻辑队列(ConsumeQueue)
和索引信息(Index)
:ConsumeQueue 记录了消息在 CommitLog 中的位置信息;给定 Topic 和消息 Key,索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用。ConsumeQueue 和 Index 的构建依然遵循顺序写。
RocketMQ 利用 mmap
将文件直接映射到用户态内存地址,由此将对文件的 IO 转化为对内存的 IO。由于使用 mmap 必须明确内存映射的大小,因此 RocketMQ 约定:单个 CommitLog 文件大小等于 1 GB,每条消息及其元信息被顺序追加至文件,文件尾部可能存在空闲区域;单个 ConsumeQueue 文件大小等于 6000000 B,存储 30 W 条记录,每条记录固定 20 B;单个 Index 文件大小等于 420000040 B,包含索引头(IndexHeader)、哈希槽(HashSlot)和消息索引(MessageIndex)。同时,CommitLog 和 ConsumeQueue 以字节偏移量作为文件名,因此第二个 CommitLog 的文件名为 1024 * 1024 * 1024 = 00000000001073741824,而第二个 ConsumeQueue 的文件名为 20 * 30 W = 00000000000006000000。
RocketMQ 以如下图所示存储格式将消息顺序写入 CommitLog。除了记录消息本身的属性(消息长度、消息体、Topic 长度、Topic、消息属性长度和消息属性),CommitLog 同时记录了消息所在消费队列的信息(消费队列 ID 和偏移量)。由于存储条目具备不定长的特性,当 CommitLog 剩余空间无法满足消息时,CommitLog 在尾部追加一个 MAGIC CODE 等于 BLANK_MAGIC_CODE 的存储条目作为结束标记,并将消息存储至下一个 CommitLog 文件。
与 CommitLog 不同,ConsumeQueue 的存储条目采用定长存储结构,如下图所示。为了实现定长存储,ConsumeQueue 存储了消息 Tag 的 Hash Code,在进行 Broker 端消息过滤时,通过比较 Consumer 订阅 Tag 的 HashCode 和存储条目中的 Tag Hash Code 是否一致来决定是否消费消息。 在已有的 CommitLog 和 ConsumeQueue 基础之上,消息中间件的消息发送和消费逻辑已经可以得到满足,RocketMQ 引入 Index 的目的是为消息建立索引方便问题排查:在给定消息 Topic 和 Key 的前提下,快速定位消息。Index 的文件存储结构如下图所示。Index 的整体设计思想类似持久化在磁盘的 HashMap,同样使用链式地址法解决哈希冲突:每个 Hash Slot 关联一个 Message Index 链表,多个 Message Index 通过 preIndexOffset 连接。 本节讨论了 RocketMQ 消息存储相关的核心文件存储结构。无论是 CommitLog,ConsumeQueue 还是 Index,RocketMQ 均使用统一的 MappedFile 来抽象。本文将讨论 RocketMQ 是如何围绕 MappedFile 并结合内存映射来构建 CommitLog(本文对 ConsumeQueue 和 Index 的异步构建不进行讨论)。
消息存储流程
在启动阶段,Broker 将消息处理器注册至核心控制器(BrokerController),Broker 根据请求的 RequestCode 将请求路由至对应的消息处理器。相比于 NameServer 将所有网络通信交由单一的消息处理器处理,Broker 定义了八种消息处理器(AdminBrokerProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor、ForwardRequestProcessor、PullMessageProcessor、QueryMessageProcessor 和 SendMessageProcessor)。其中,SendMessageProcessor 负责处理消息发送请求,其注册相关的核心代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// org.apache.rocketmq.broker.BrokerStartup#main
public static void main(String[] args) {
start(createBrokerController(args));
}
// org.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {
...
try {
// Configuation initialization.
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
...
boolean initResult = controller.initialize();
...
return controller;
} catch (Throwable e) {
...
}
return null;
}
// org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {
...
// Create several thread pool service.
this.registerProcessor();
// Create and execute a periodic action.
...
}
// org.apache.rocketmq.broker.BrokerController#registerProcessor
public void registerProcessor() {
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE,
sendProcessor,
this.sendMessageExecutor);
...
}
SendMessageProcessor 实现了 NettyRequestProcessor 接口,其对消息的核心处理逻辑实现在 processRequest 方法中,RocketMQ 发送消息包括单个消息发送和批量消息发送,本节以单个消息发送为例进一步说明。不同 Topic 的消息最终均被顺序持久化至共享的 CommitLog,CommitLog 由固定大小的文件队列组成,文件队列被定义为 MappedFileQueue,MappedFileQueue 中每个文件被定义为 MappedFile,每个MappedFile 对应一个具体的文件用于将消息持久化至磁盘。CommitLog、MappedFileQueue 和 MappedFile 之间的依赖关系如下所示。
Broker 启动阶段 DefaultMessageStore 被初始化,DefaultMessageStore 是 RocketMQ 消息存储的抽象,提供 CommitLog 的维护、ConsumeQueue & Index 的异步构建(ReputMessageService)、MappedFile 内存映射的分配(AllocateMappedFileService)、HA(HAService) 等保障。DefaultMessageStore 通过 putMessage 方法将消息存储至 CommitLog,
核心代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// org.apache.rocketmq.store.DefaultMessageStore#putMessage
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
//当前broker停止工作
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
//SLAVE节点
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
//
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
//不可写
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
//大于127
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
//繁忙
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
//这里真正putMsg
PutMessageResult result = this.commitLog.putMessage(msg);
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
在将消息存储至 CommitLog 之前,需要校验 DefaultMessageStore 状态、当前 Broker 节点角色、DefaultMessageStore 是否允许写入、Topic 和 Properties 是否超长和 PageCache 是否繁忙。校验通过后,消息交由 CommitLog 的 putMessage 方法将消息 append 至 MappedFileQueue 中最后一个 MappedFile。putMessage 方法核心流程(暂不涉及延迟消息)包括:尝试获取最后一个 MappedFile,然后通过对 CommitLog 加锁
将 append CommitLog 限定为一种串行操作,保证其顺序写
。如果没有获取到 MappedFile 或者 MappedFile 已满,创建新的 MappedFile;将消息 append 至 MappedFile,如果返回结果为 END_OF_FILE,说明 MappedFile 已经没有足够的剩余空间,创建新的 MappedFile 并将消息重新 append 至新 MappedFile;释放 CommitLog 锁。上述逻辑核心代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
...
AppendMessageResult result = null;
...
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
...
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
...
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
...
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case ...
}
...
} finally {
putMessageLock.unlock();
}
// Broker 刷盘策略以及 HA 机制
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
进一步理解创建 MappedFile 的原理。DefaultMessageStore 初始化期间启动 AllocateMappedFileService
线程。当需要创建 MappedFile 时,首先基于 startOffset 计算 MappedFile 文件名,包括两种场景:如果此时不存在 MappedFile,意味着当前为第一次消息投递或者历史 MappedFile 已经被清理,createOffset 不能简单等于 startOffset % mappedFileSize,以避免 MappedFile 文件名无限制增长;如果此时最后一个 MappedFile 已存在且已满,createOffset 等于最后一个 MappedFile 的 fromOffset + mappedFileSize。消息处理线程基于 createOffset 构建两个连续的 AllocateRequest 并插入 AllocateMappedFileService 线程维护的 requestQueue
。AllocateMappedFileService 线程读取 requestQueue 中的 AllocateRequest 异步创建对应的 MappedFile。在创建过程中,消息处理线程通过 CountDownLatch 同步等待 MappedFile 完成创建。思考一个问题,消息处理线程为什么不直接同步创建 MappedFile,而是通过创建一个 AllocateRequest 请求,由 AllocateMappedFileService 线程异步统一处理?构建 AllocateRequest 并插入 requestQueue 的核心代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile
public MappedFile getLastMappedFile(final long startOffset) {
return getLastMappedFile(startOffset, true);
}
// org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator +
UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile =
this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
}
...
return mappedFile;
}
return mappedFileLast;
}
// org.apache.rocketmq.store.AllocateMappedFileService#putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
...
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
if (nextPutOK) {
...
boolean offerOK = this.requestQueue.offer(nextReq);
...
canSubmitRequests--;
}
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK =
this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
...
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
...
}
}
...
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
boolean waitOK = result.getCountDownLatch().await(waitTimeOut,
TimeUnit.MILLISECONDS);
if (!waitOK) {
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
回答上面提出的问题,AllocateMappedFileService 线程循环从 requestQueue 获取 AllocateRequest,AllocateRequest 实现了 Comparable 接口,依据文件名从小到大排序。当需要创建 MappedFile 时,同时构建两个 AllocateRequest,消息处理线程通过 CountDownLatch 将 AllocateMappedFileService 线程异步创建第一个 MappedFile 文件转化为同步操作
(RocketMQ 存在大量利用 CountDownLatch 将异步转化为同步的案例),而第二个 MappedFile 文件的仍然创建交由 AllocateMappedFileService 线程异步创建。当消息处理线程需要再次创建 MappedFile 时,此时可以直接获取已创建的 MappedFile。AllocateMappedFileService 线程创建 MappedFile 核心逻辑精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// org.apache.rocketmq.store.AllocateMappedFileService#run
public void run() {
while (!this.isStopped() && this.mmapOperation()) {}
}
// org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
...
if (req.getMappedFile() == null) {
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(),
messageStore.getTransientStorePool());
} catch (RuntimeException e) {
...
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(),
messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
...
req.setMappedFile(mappedFile);
...
}
} catch (InterruptedException e) {
...
} catch (IOException e) {
...
} finally {
...
req.getCountDownLatch().countDown();
}
return true;
}
继续理解 MappedFile 的创建原理。
- 如果 isTransientStorePoolEnable 为 true,MappedFile 会将TransientStorePool 申请的堆外内存(
Direct Byte Buffer
)空间作为 writeBuffer,写入消息时先将消息写入 writeBuffer,然后将消息提交至fileChannel
再flush
; - isTransientStorePoolEnable 为 false,直接创建 MappedFile 内存映射文件字节缓冲区
mappedByteBuffer
,将消息写入 mappedByteBuffer 再flush
。完成消息写入后,更新 wrotePosition(此时还未 flush 至磁盘)。 RocketMQ 写入消息时为什么采用两种不同的方式?消息 append 至 MappedFile 核心代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// org.apache.rocketmq.store.MappedFile#appendMessage
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg,
final AppendMessageCallback cb) {
return appendMessagesInner(msg, cb);
}
// org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt,
final AppendMessageCallback cb) {
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ?
writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer,
this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt);
} else {
...
}
this.wrotePosition.addAndGet(result.getWroteBytes());
...
return result;
}
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset,
final ByteBuffer byteBuffer,
final int maxBlank,
final MessageExtBrokerInner msgInner) {
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
...
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
...
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
this.msgStoreItemMemory.putInt(maxBlank);
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(...);
}
// Initialization of storage space
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(...);
...
return result;
}
如果有足够的剩余空间供消息写入,设置 byteBuffer(writeBuffer/mappedByteBuffer)的 position 等于 wrotePosition,执行 byteBuffer 的 put 方法将字节数组写入即可。如果 MappedFile 没有足够剩余空间(msgLen + END_FILE_MIN_BLANK_LENGTH > maxBlank),向 byteBuffer 写入 BLANK_MAGIC_CODE 后返回 END_OF_FILE,消息处理线程创建新的 MappedFile 并将消息 append 至 byteBuffer,映射关系如下图所示。
刷盘策略
RocketMQ 刷盘策略分为 commit 和 flush 两阶段,分别由 commitLogService 和 flushCommitLogService 负责,如下图所示:在 commit 阶段,如果 isTransientStorePoolEnable 为 true,数据从 writeBuffer 写入 fileChannel,否则数据仍然驻留在 mappedByteBuffer;在 flush 阶段,将数据从 fileChannel 或者 mappedByteBuffer 持久化至磁盘。 在 CommitLog 构造阶段,commitLogService 被实例化为 CommitRealTimeService,对于同步刷盘,flushCommitLogService 被实例化为 GroupCommitService,对于异步刷盘,flushCommitLogService 被实例化为 FlushRealTimeService。GroupCommitService(SYNC)、CommitRealTimeService 和 FlushRealTimeService(ASYNC) 均继承自 ServiceThread 类并实现了 Runnable 接口,在 Broker 启动后创建线程并循环执行相应操作。CommitLog 构造函数及启动代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// org.apache.rocketmq.store.CommitLog#CommitLog
public CommitLog(final DefaultMessageStore defaultMessageStore) {
...
if (FlushDiskType.SYNC_FLUSH ==
defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
...
}
// org.apache.rocketmq.store.CommitLog#start
public void start() {
this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
GroupCommitService 线程将 writeBuffer 中的数据 commit 至 fileChannel,执行 commit 有以下两个触发条件。
- writeBuffer 中待 commit 的数据的页数大于等于 commitCommitLogLeastPages,默认为 4 页,每页大小为 4 KB,即当待 commit 的数据超过 16 KB 时,执行 commit 操作。
- 最近 commitCommitLogThoroughInterval 时间间隔内没有执行 commit 操作,主动执行一次 commit 操作,默认为 200 ms。
上述两个触发条件控制了 writeBuffer 中的数据能够在合并后再写入 fileChannel,提升 IO 性能。commit 操作执行完成后,GroupCommitService 唤醒 flushCommitLogService 线程执行 flush 操作。上述流程对应代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run
@Override
public void run() {
while (!this.isStopped()) {
...
int commitDataLeastPages = ...
int commitDataThoroughInterval = ...
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
flushCommitLogService.wakeup();
}
...
} catch (Throwable e) {
...
}
}
...
}
commit 操作本节不再详细讨论,MappedFile 维护的 committedPosition 和 wrotePosition 分别标记了 writeBuffer 中待 commit 数据的起始偏移量和终止偏移量,commit 操作基于此构造 ByteBuffer 并写入 fileChannel。上述流程核心代码如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// org.apache.rocketmq.store.MappedFile#commit0
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
...
}
}
}
相比于 Kafka,除了提供异步刷盘的能力,RocketMQ 还提供了同步刷盘的能力。同步刷盘的实现方式类似于 MappedFile 创建,即构造刷盘请求 GroupCommitRequest 写入请求队列,由异步线程 GroupCommitService 消费请求。对于异步刷盘,如果 isTransientStorePoolEnable 为 true,唤醒 CommitRealTimeService 线程将 writeBuffer 中的数据 commit 至 fileChannel,否则唤醒 FlushRealTimeService 线程将 mappedByteBuffer 的数据刷盘。RocketMQ 发起刷盘的核心代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// org.apache.rocketmq.store.CommitLog#handleDiskFlush
public void handleDiskFlush(AppendMessageResult result,
PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH ==
this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request =
new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(...);
if (!flushOK) {
...
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
if (!isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
为了避免刷盘请求 GroupCommitRequest 的锁竞争,GroupCommitService 线程维护了 GroupCommitRequest 读队列 requestsRead 和写队列 requestsWrite,GroupCommitRequest 的提交和消费互不阻塞。当 GroupCommitService 线程消费完 requestsRead 队列后,清空 requestsRead,交换 requestsRead 和 requestsWrite
。上述逻辑核心代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class GroupCommitService extends FlushCommitLogService {
private volatile List<GroupCommitRequest> requestsWrite =
new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead =
new ArrayList<GroupCommitRequest>();
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// Invoke force() to flush fileChannel/mappedByteBuffer to disk.
}
...
this.requestsRead.clear();
} else {
...
}
}
}
public void run() {
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
...
}
}
...
synchronized (this) {
this.swapRequests();
}
this.doCommit();
}
...
}
无论是否开启 isTransientStorePoolEnable,异步刷盘交由 FlushRealTimeService 线程处理。在 handleDiskFlush 方法中,如果 isTransientStorePoolEnable 为 true,仅唤醒了 CommitRealTimeService 线程,但是实际上 CommitRealTimeService 线程在 commit 之后也会唤醒 FlushRealTimeService 线程。FlushRealTimeService 线程维护了 lastFlushTimestamp 以标记上次 flush 磁盘的时间点。 FlushRealTimeService 线程 flush 磁盘的触发条件与 GroupCommitService 线程 commit 数据的触发条件类似: fileChannel 或者 mappedByteBuffer 中待 flush 的数据页大小大于等于 flushPhysicQueueLeastPages 页,默认为 4 页;距上次 flush 磁盘时间间隔超过 flushPhysicQueueThoroughInterval,默认为 10 s。最终 FlushRealTimeService 线程调用 fileChannel 和 mappedByteBuffer 的 force 方法将数据刷盘。
内存映射优化
RocketMQ 利用 mmap 将内核空间的一段内存区域映射至用户空间,映射关系一旦建立,应用程序对这段内存区域的修改可以直接反映到内核空间,反之亦然。相比如 read/write 系统调用,mmap 减少了内核空间和用户空间之间的数据拷贝,在存在大量数据传输的场景下可以有效提升 IO 效率。但是,通过 mmap 建立内存映射仅是将文件磁盘地址和虚拟地址通过映射对应起来,此时物理内存并没有填充磁盘文件内容。当实际发生文件读写时,产生产生缺页中断并陷入内核,然后才会将磁盘文件内容读取至物理内存。针对上述场景,RocketMQ 设计了 MappedFile 预热机制。
回顾 MappedFile 的创建流程,AllocateMappedFileService 线程轮询 AllocateRequest 请求队列并创建MappedFile,此时文件系统中已经存在对应的固定大小的文件。当 RocketMQ 开启 MappedFile 内存预热(warmMapedFileEnable),且 MappedFile 文件映射空间大小大于等于 mapedFileSizeCommitLog(1 GB) 时,调用 warmMappedFile 方法对 MappedFile 进行预热。上述逻辑核心代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take();
...
if (req.getMappedFile() == null) {
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(),
messageStore.getTransientStorePool());
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
...
// pre write mappedFile
if (mappedFile.getFileSize() >= getMapedFileSizeCommitLog()
&& isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(getFlushDiskType(),
getFlushLeastPagesWhenWarmMapedFile());
}
...
}
} catch (InterruptedException e) {
...
} catch (IOException e) {
...
} finally {
...
}
return true;
}
warmMappedFile 每间隔 OS_PAGE_SIZE 向 mappedByteBuffer 写入一个 0,此时对应页恰好产生一个缺页中断,操作系统为对应页分配物理内存。同时,如果刷盘策略为同步刷盘,需要对每页进行刷盘。最后,通过 JNA 调用 mlock 方法锁定 mappedByteBuffer 对应的物理内存,阻止操作系统将相关的内存页调度到交换空间(swap space),以此提升后续在访问 MappedFile 时的读写性能。warmMappedFile 核心代码精简如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// org.apache.rocketmq.store.MappedFile#warmMappedFile
public void warmMappedFile(FlushDiskType type, int pages) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc
if (j % 1000 == 0) {
...
}
}
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
mappedByteBuffer.force();
}
this.mlock();
}
// org.apache.rocketmq.store.MappedFile#mlock
public void mlock() {
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
}
...
}