参考: 分布式开放消息系统(RocketMQ)的原理与实践 RocketMQ实战与原理解析 [TOC]
整体架构
-
1.NameServer:NameServer之前相互独立,Broker、Producer、Consumer会定时向多个NameServer发送心跳,起到一个轻量级路由的作用。同时NameServer默认对所有的信息都是内存存储,不会持久化,即无状态。
-
2.Broker:接收Producer发送的消息做持久化存储,同时接收Consumer发送的消费请求,将消息发送给Consumer消费。
-
3.Producer:消息的生产者。
-
4.Consumer:消息的消费者,有Push、Pull两种类型。Push以长轮询的方式从Broker拉取消息消费,Pull则是由业务方主动调用Consumer拉取消息消费。 整体架构
以上四种角色都支持集群化部署
一、消息顺序
RocketMQ怎么实现发送顺序消息
要实现严格的顺序消息,简单且可行的办法就是:
保证生产者 - MQServer - 消费者是一对一对一的关系
将M1和M2发往同一个消费者,且发送M1后,需要消费端响应成功后才能发送M2。
生产者-MQ-消费者保证一一对应的模型
先发送M1, 客户端确认接收后,返回ACK MQ在接收到ACK后再发送M2,如果一定时间内没有接收到M1则重发。
这样的模型就严格保证消息的顺序。
代码实现
RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:
1
2
3
4
5
6
7
8
9
10
11
// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的肯定是同一个队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
private SendResult send() {
// 获取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 根据我们的算法,选择一个发送队列
// 这里的arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
}
}
但是消费端1没有响应Server时有两种情况,
- 1.一种是M1确实没有到达(数据在网络传送中丢失),
- 2.另外一种消费端已经消费M1且已经发送响应消息,只是MQ Server端没有收到。
如果是第二种情况,重发M1,就会造成M1被重复消费,导致消息系统设计的第二个问题:消息的重复问题。
二、消息重复
RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。
- 消费端处理消息的业务逻辑保持幂等性
- 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现,利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。
三、事务消息
四、Producer如何发送消息
Producer轮询某topic下的所有队列的方式来实现发送方的负载均衡,如下图所示: producer发送消息负载均衡
生产者发送消息的源码
1
2
3
4
5
6
7
8
9
10
11
12
13
// 构造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整个应用生命周期内,只需要初始化1次
producer.start();
// 构造Message
Message msg = new Message("TopicTest1",// topic
"TagA",// tag:给消息打标签,用于区分一类消息,可为null
"OrderID188",// key:自定义Key,可以用于去重,可为null
("Hello MetaQ").getBytes());// body:消息内容
// 发送消息并返回结果
SendResult sendResult = producer.send(msg);
// 清理资源,关闭网络连接,注销自己
producer.shutdown();
在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:
如果没有指定namesrv地址,将会自动寻址 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳… 启动负载均衡的服务
初始化完成后,开始发送消息,发送消息的主要代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
private SendResult sendDefaultImpl(Message msg,......) {
// 检查Producer的状态是否是RUNNING
this.makeSureStateOK();
// 检查msg是否合法:是否为null、topic,body是否为空、body是否超长
Validators.checkMessage(msg, this.defaultMQProducer);
// 获取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// 从路由信息中选择一个消息队列
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
// 将消息发送到该队列上去
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
代码中需要关注的两个方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。
如果Producer发送消息失败,会自动重试,重试的策略:
- 重试次数 < retryTimesWhenSendFailed(可配置)
- 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
- 同时满足上面两个条件后,Producer会选择另外一个队列发送消息
五、消息存储
RocketMQ的消息存储是由consume queue和commit log配合完成的。
1、Consume Queue
consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。
我们可以在配置中指定consumequeue与commitlog存储的目录
每个topic下的每个queue都有一个对应的consumequeue文件
,比如:
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume Queue文件组织,如图所示: Consume Queue文件组织示意图
根据topic和queueId来组织文件,图中
TopicA有两个队列0,1, 那么TopicA和QueueId=0组成一个ConsumeQueue, TopicA和QueueId=1组成另一个ConsumeQueue。
按照消费端的GroupName来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的%RETRY%ConsumerGroupA。 按照消费端的GroupName来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的%DLQ%ConsumerGroupA。
死信队列(Dead Letter Queue)一般用于存放由于某种原因无法传递的消息,比如处理失败或者已经过期的消息。
Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,如下图所示: consumequeue文件存储单元格式
1.CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量 2.Size存储中消息的大小 3.Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)
2、Commit Log
CommitLog:消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。 文件的默认位置如下,仍然可通过配置文件修改:
${user.home} \store\${commitlog}\${fileName}
CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构如下表所示,按照编号顺序以及编号对应的内容依次存储。 Commit Log存储单元结构图
3、CommitLog刷盘策略
Broker收到消息后,先将消息写到内存中CommitLog的ByteBuffer上,写入成功后,再根据刷盘策略将消息写到CommitLog对应的磁盘文件中,分为两种刷盘策略:
-
同步刷盘:内存写入成功后,同步将消息写入到磁盘上。只有当磁盘写入成功时,生产者才认为消息生产成功。这种模式的优点是消息的可靠性高,但降低了吞吐量。
-
异步刷盘:内存写入成功即认为消息生产成功,然后由后台线程根据偏移量异步写入到磁盘上。此时如果机器断电等内存丢失,会导致消息丢失。
4、消息存储实现
5、消息的索引文件
如果一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:
消息索引
索引文件主要用于根据key来查询消息的,流程主要是: 1.根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000) 2.根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项) 3.遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
六、消息读取
用磁盘来持久化消息可以实现消息的海量存储,但是消息的读写性能会受到一定影响。
mmap
1、应用程序先向内核发起read/write系统调用,进程从用户态切换到内核态 2、在物理内存中分配一块空间,通过DMA copy将磁盘内容拷贝内核态缓冲 3、通过CPU copy将内存中的数据从内核态缓冲拷贝到用户态缓冲(Java堆内存),此时进程从内核态又切换到用户态
写操作同样,依然要经过cpu copy ——> DMA copy,如下图(3、4两步即通过Socket进行网络传输)。
这样频繁的上下文切换对于高性能中间件来说,会严重的降低性能。为此,RocketMQ使用mmap系统调用将磁盘数据映射到进程虚拟地址空间上,用户态、内核态共用一个缓冲区(堆外内存),减少cpu copy。mmap的流程如下: mmap调用过程
①应用程序发起mmap系统调用
②进程切换到内核态,发起do_mmap_pgoff调用,该调用在进程的虚拟地址空间中分配一块区域用于映射磁盘数据存放地址,同时构建了一个VMA结构体指向该区域。此时只是建立了虚拟内存地址到磁盘数据物理存储地址的映射关系,并未将磁盘数据读到物理内存中。
③应用程序通过前面mmap返回的地址尝试读取磁盘数据时,由于此时并没有将任何磁盘数据copy到物理内存中(类似于懒加载),也没有建立虚拟内存地址到物理内存地址的映射关系,会导致触发PageFault硬盘缺页中断。
④缺页中断处理器访问磁盘,通过DMA copy将磁盘上的page cache项拷贝到物理内存中,并建立数据虚拟内存地址到物理内存地址的映射关系,此时应用程序就可以正常访问内存区域,不需要再经过CPU copy。
⑤通过mmap对文件进行写操作时,一定时间后系统会自动回写脏页面到对应磁盘地址。
mmap特点
优点:
①容量较大的文件来说(1.5~2G),采用mmap的方式其读/写的效率和性能都非常高
②内核空间的数据是所有进程共享的,使用mmap可以达到文件内容多进程共享
缺点:
①磁盘数据拷贝到堆外内存中,被其打开的文件只有在垃圾回收的才会被关闭,导致关闭时间不确定
②修改过的脏页面并不会立即更新回文件中,而是有一段时间的延迟
内存锁定
在读取CommitLog时,虽然可以通过PageCache提高目标消息直接在物理内存中读取的命中率。但是由于CommitLog存放的是所有Topic的消息,在读取时是随机访问,所以仍会出现缺页中断问题,导致内存被频繁换入换出。为此,RocketMQ使用了mlock系统调用,将mmap调用后所占用的堆外内存锁定,变为常驻内存,进一步让目标消息更多的在内存中读取。
文件预读
前面在介绍mmap系统调用时,提到mmap类似于懒加载,在要读取磁盘数据的时候才会通过DMA COPY拷贝到物理内存中。这样的硬盘缺页中断会造成一定延迟,RocketMQ可以通过配置,使程序启动的时候,通过madvise系统调用对文件进行预读,减少在程序运行时的硬盘缺页中断。
小结
相较于传统read,RocketMQ通过mmap、pageCache、文件预读、内存锁定等,提高了目标消息直接在内存读取的命中率,少部分可能会先通过DMA copy再进行读取,如下图所示。
六、消息订阅
RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。
首先看下消费端的负载均衡: 消费端负载均衡
消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载: 1.遍历Consumer下的所有topic,然后根据topic订阅所有的消息 2.获取同一topic和Consumer Group下的所有Consumer 3.然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等
如同上图所示:如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。这里采用的就是平均分配策略,它类似于分页的过程,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。
通过这样的策略来达到大体上的平均消费,这样的设计也可以很方面的水平扩展Consumer来提高消费能力。 消费端的Push模式是通过长轮询的模式来实现的,就如同下图: Push模式示意图
Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。
当然,Consumer端是通过一个线程将阻塞队列LinkedBlockingQueue
七 消息消费
消息过滤
RocketMQ的最佳实践中,建议每一个应用只有一个Topic,不同的消息类型通过tag区分。前面提到,RocketMQ由CommitLog、ConsumeQueue负责存储消息,而这两者都不关心消息tag,当生产者发送的消息被散列到各个MessageQueue后,RocketMQ为了防止下游消费方收到无用的消息,节约内存、带宽,在Consumer拉取消息时会先在Broker端的ConsumeQueue通过tag的hashCode进行消息过滤,tag的hashCode过滤成功后,考虑到两个tag的hashCode可能相同,会再对CommitLog中查询到的消息进行一次tag的匹配。
消息消费方式
Consumer在拉取到一定量消息后,对消息进行消费时,分为两种消费方式:Orderly、Concurrently。Orderly可以保证Consumer按照MessageQueue上的提交顺序对消息进行消费。而Concurrently利用多线程提高消费能力,会使消息消费无序。
八 定时消息
RocketMQ支持18种时间精度让Producer指定消息延迟消费的时长,但不支持任意时间精度 下图是Produer向TopicA下的两个MessageQueue发送消息,现在Producer发送了一个延迟精度为30min的延迟消息。 在RocketMQ内部,Broker在收到延迟消息后,先放到了一个名内“SCHEDULE_TOPIC_XXXX”的内部Topic,有多少种时间精度该Topic下就有多少MessageQueue,同时每一个MessageQueue又各自对应一个线程,类关系如下图所示。 每一个MessageQueue一个线程
SCHEDULE_TOPIC_XXXX下的MessageQueue负责暂存
Producer发送的消息,DeliverDelayedMessageTimerTask
根据计算的消息delay对自己负责的MessageQueue进行遍历,将达到消费时间的消息重新放入指定的Topic中,提供给下游Consumer消费。
xxoo、RocketMQ的其他特性
1. 2. 3.主动同步策略:同步双写、异步复制 4.海量消息堆积能力 5.高效通信 6……..
一、Producer最佳实践
1、一个应用尽可能用一个 Topic,消息子类型用 tags
来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。
2、每个消息在业务层面的唯一标识码
,要设置到 keys
字段,方便将来定位消息丢失问题。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
3、消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段
。
4、对于消息不可丢失应用,务必要有消息重发机制
。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
5、某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。
二、Consumer最佳实践
1、消费过程要做到幂等(即消费端去重)
2、尽量使用批量消费方式
,可以很大程度上提高消费吞吐量。
3、优化每条消息消费过程
三、其他配置
线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。 RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。 所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。
RocketMQ设计相关
RocketMQ的设计假定:
每台PC机器都可能宕机不可服务 任意集群都有可能处理能力不足 最坏的情况一定会发生 内网环境需要低延迟来提供最佳用户体验
RocketMQ的关键设计:
分布式集群化 强数据安全 海量数据堆积 毫秒级投递延迟(推拉模式)
这是RocketMQ在设计时的假定前提以及需要到达的效果。我想这些假定适用于所有的系统设计。随着我们系统的服务的增多,每位开发者都要注意自己的程序是否存在单点故障,如果挂了应该怎么恢复、能不能很好的水平扩展、对外的接口是否足够高效、自己管理的数据是否足够安全…… 多多规范自己的设计,才能开发出高效健壮的程序。