Rocket4.0-Knowledge-Architecture
本文将带您深入了解 RocketMQ 4.X 的核心知识体系,从架构设计到关键机制,一探这款高可用消息中间件的底层逻辑。
1 整体架构
RocketMQ 4.X 架构中包含四种角色 :
1、NameServer
名字服务是是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。它是一个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper ,支持 Broker 的动态注册与发现。
2、BrokerServer
Broker 主要负责消息的存储、投递和查询以及服务高可用保证 。
3、Producer
消息发布的角色,Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
4、Consumer
消息消费的角色,支持以 push 推,pull 拉两种模式对消息进行消费。

RocketMQ 集群工作流程:
1、启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer 、Consumer 连上来,相当于一个路由控制中心。
2、Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker信息( IP+端口等 )以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
3、收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
4、Producer 发送消息,启动时先 跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
5、Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。
2 发布订阅模型
传统的消息队列 ActiveMQ 是典型的点对点模式。

- 在点对点模式中,消息发送者(生产者)将消息发送到一个特定的队列,而消息接收者(消费者)从该队列中接收消息。
- 消息在队列中存储,一旦一个消息被消费者接收,它就从队列中移除,这确保了每个消息只被一个消费者处理。
- 这种模式适用于一对一的通信,其中一个生产者向一个特定的消费者发送消息,确保消息的可靠传递和处理。
RocketMQ 和 Kafka 是发布订阅模式。

- 在发布订阅模式中,消息发送者将消息发布到一个主题(topic),而消息订阅者则订阅感兴趣的主题。
- 每个主题可以有多个订阅者,因此消息会被广播到所有订阅了相同主题的消费者。
- 这种模式适用于一对多或多对多的通信,允许多个消费者同时接收和处理相同主题的消息。
- 发布订 阅模式通常用于构建实时事件处理系统、日志处理、通知系统等,其中多个消费者需要订阅相同类型的消息并进行处理。
3 通讯框架

01 通讯协议
传输内容分为以下四个部分:
1、消息长度:
总长度,四个字节存储,占用一个 int 类型;
2、序列化类型 & 消息头长度:
占用一个 int 类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
3、消息头数据:
经过序列化后的消息头数据;
4、消息主体数据:
消息主体的二进制字节数据内容。
消息头数据序列化默认是 JSON 格式 ,示例如下:

02 Reactor 模型
Reactor 线程模型抽象出三种组件:
- Reactor(反应器):Reactor 负责监听和分发事件,它是整个 Reactor 模型的调度中心。
- Acceptor(接收器):用于处理 IO 连接请求。
- Handlers(处理器):Handlers 负责具体的事件处理逻辑,即执行与事件相关的业务操作
Remoting 通讯框架采用了典型的主从多线程模型 ,但还是有变化,即:独立的业务线程池对应不同的请求业务类型。

一个 Reactor 主线程 ( eventLoopGroupBoss )责监听 TCP网络连接请求,建立好连接,创建 SocketChannel , 并注册到 selector 上。
RocketMQ 源码会自动根据 OS 的类型选择 NIO 和 Epoll ,也可以通过参数配置 ), 然后监听真正的网络数 据。
拿到网络数据后,再丢给 Worker 线程池( eventLoopGroupSelector ),再真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作都交给 defaultEventExecutorGroup 去做。
而业务操作由业务线程池中处理,根据 RemotingCommand 的业务请求编号 requestCode , 从处理器表 processorTable 这个本地缓存中找到对应的处理器 , 然后封装成 task 任务后,提交到对应的业务处理器的线程池执行。

RocketMQ 的线程模型如下所示 :
| 线程数 | 线程名 | 线程具体说明 |
|---|---|---|
| 1 | NettyBoss_%d | Reactor 主线程 |
| N | NettyServerEPOLLSelector_%d_%d | Reactor 线程池 |
| M1 | NettyServerCodecThread_%d | Worker线程池 |
| M2 | RemotingExecutorThread_%d | 业务 processor 处理线程池 |
3 文件存储机制
我们先进入 broker 的文件存储目录 。

消息存储和下面三个文件关系非常紧密:
-
数据文件
commitlog消息主体以及元数据的存储主体 ;
-
消费文件
consumequeue消息消费队列,引入的目的 主要是提高消息消费的性能 ;
-
索引文件 indexfile
索引文件,提供了一种可以通过 key 或时间区间来查询消息。

RocketMQ 采用的是混合型的存储结构,Broker 单个实例下所有的队列共用一个数据文件(commitlog)来存储。
生产者发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 commitlog 文件中。只要消息被刷盘持久化至磁盘文件 commitlog 中,那么生产者发送的消息就不会丢失。
Broker 端的后台服务线程会不停地分发请求并异步构建 consumequeue(消费文件)和 indexfile(索引文件)。
4 高性能读写
01 顺序写
首先消息是一条一条写入到文件,每条消息的格式是固定的,这种设计对于文件读写来讲有两点优势:
磁盘的存取速度相对内存来讲并不快,一次磁盘 IO 的耗时主要取决于:寻道时间和盘片旋转时间,提高磁盘 IO 性能最有效的方法就是:减少随机 IO,增加顺序 IO 。

《 The Pathologies of Big Data 》这篇文章指出:内存随机读写的速度远远低于磁盘顺序读写的速度。磁盘 顺序写入速度可以达到几百兆/s,而随机写入速度只有几百 KB /s,相差上千倍。
因为消息是一条一条写入到 commitlog 文件 ,写入完成后,我们可以得到这条消息的物理偏移量。
每条消息的物理偏移量是唯一的, commitlog 文件名是递增的,可以根据消息的物理偏移量通过二分查找,定位消息位于那个文件中,并获取到消息实体数据。
02 内存映射机制
mmap 是 Linux 提供的一种内存映射文件的机制,它实现了将内核中读缓冲区地址与用户空间缓冲区地址进行映射,从而实现内核缓冲区与用户缓冲区的共享。

基于 mmap + write 系统调用的零拷贝方式,整个拷贝过程会发生 4 次上下文切换,1 次 CPU 拷贝和 2 次 DMA 拷贝。

用户程序读写数据的流程如下:
- 用户进程通过
mmap()函数向内核发起系统调用,上下文从用户态切换为内核态。 - 将用户进程的内核空间的读缓冲区与用户空间的缓存区进行内存地址映射。
- CPU 利用 DMA 控制器将数据从主存或硬盘拷贝到内核空间的读缓冲区。
- 上下文从内核态切换回用户态,
mmap系统调用执行返回。 - 用户进程通过
write()函数向内核发起系统调用,上下文从用户态切换为内核态。 - CPU 将读缓冲区中的数据拷贝到的网络缓冲区。
- CPU 利用 DMA 控制器将数据从网络缓冲区(
socket buffer)拷贝到网卡进行数据传输。 - 上下文从内核态切换回用户态,
write系统调用执行返回。
| 拷贝方式 | CPU拷贝 | DMA拷贝 | 系统调用 | 上下文切换 |
|---|---|---|---|---|
| 传统方式(read + write) | 2 | 2 | read / write4 | |
| 内存映射(mmap + write) | 1 | 2 | mmap / write | 4 |
| sendfile | 1 | 2 | sendfile | 2 |
| sendfile + DMA gather copy | 0 | 2 | sendfile | 2 |
RocketMQ 选择了 mmap + write 这种零拷贝方式,适用于业务级消息这种小块文件的数据持久化和传输;
而 Kafka 采用的是 sendfile 这种零拷贝方式,适用于系统日志消息这种高吞吐量的大块文件的数据持久化和传输。
5 消费流程

核心流程如下:
-
消费者启动后,触发负载均衡服务 ,负载均衡服务为消费者实例分配对应的队列 ;
-
分配完队列后,负载均衡服务会为每个分配的新队列创建一个消息拉取请求
pullRequest, 拉取请求保存一个处理队列processQueue,内部是红黑树(TreeMap),用来保存拉取到的消息 ; -
拉取消息服务单线程从拉取请求队列
pullRequestQueue中弹出拉取消息,执行拉取任务 ,拉取请求是异步回调模式,将拉取到的消息放入到处理队列; -
拉取请求在一次拉取消息完成之后会复用,重新被放入拉取请求队列
pullRequestQueue中 ; -
拉取完成后,调用消费消息服务
consumeMessageService的submitConsumeRequest方法 ,消费消息服务内部有一个消费线程池; -
消费线程池的消费线程从消费任务队列中获取消费请求,执行消费监听器
listener.consumeMessage; -
消费完成后,若消费成功,则更新偏移量
updateOffset,先更新到内存offsetTable,定时上报到 Broker ;若消费失败,则将失败消费发送到 Broker 。 -
Broker 端接收到请求后, 调用消费进度管理器的
commitOffset方法修改内存的消费进度,定时刷盘到consumerOffset.json。
6 传统部署模式
01 双 Master 模式
所有节点都是 master 主节点(比如 2 个或 3 个主节点),没有 slave 从节点的模式。

该模式的优缺点如下: