RocketMQ-Message-Type-Analysis
本文是基于RocketMQ 4.9版本讲解
RocketMQ 11种消息类型
- 普通消息
- 批量消息
- 顺序消息
- 重试消息
- 死信消息
- 同步消息
- 延迟消息
- 事务消息
- 请求-应答消息
- 异 步消息
- 单向消息
前置知识
为了帮助大家更好地理解这些消息底层的实现原理,这里我就通过三个问题来讲一讲RocketMQ最最基本的原理
如果有什么不解的,可以看看我之前写的RocketMQ消息短暂而又精彩的一生这篇文章
1、生产者如何发送消息
在RocketMQ中有两个重要的角色
- NameServer:就相当于一个注册中心
- Broker:RocketMQ服务端
当RocketMQ服务端,也就是Broker在启动的时候,会往NameServer注册自己的信息
这些信息其中就包括
- 当前Broker所在机器的ip和端口
- 当前Broker管理的Topic的名称以及每个Topic有几个队列
当生产者和消费者启动的时候,就会从NameServer拉取这些信息,这样生产者和消费者就可以通过NameServer中获取到Broker的ip和端口,跟Broker通信了
而Topic我们也都知道,是消息队列中一个很重要的概念,代表了一类消息的集合
在RocketMQ中,每个Topic默认都会有4个队列,并且每个队列都有一个id,默认从0开始,依次递增
topic
队列 id=0
队列 id=1
队列 id=2
队列 id=3
当生产者发送消息的时候,就会从消息所在Topic的队列中,根据一定的算法选择一个,然后携带这个队列的id(queueId),再发送给Broker
携带的队列的id就代表了这条消息属于这个队列的
所以从更细化的来说,消息虽然是在Topic底下,但是真正是分布在不同的队列上的,每个队列会有这个Topic下的部分消息。
2、消息存在哪
当消息被Broker接收到的时候,Broker会将消息存到本地的磁盘文件中,保证Broker重启之后消息也不丢失
RocketMQ给这个存消息的文件起了一个高大上的名字:CommitLog
由于消息会很多,所以为了防止文件过大,CommitLog在物理磁盘文件上被分为多个磁盘文件,每个文件默认的固定大小是1G
消息在写入到文件时,除了包含消息本身的内容数据,也还会包含其它信息,比如
- 消息的Topic
- 消息所在队列的id,前面提到过
- 消息生产者的ip和端口
- ...
这些数据会和消息本身按照一定的顺序同时写到CommitLog文件中
上图中黄色排列顺序和实际的存的内容并非实际情况,我只是举个例子
3、消费者如何消费消息
消费者是如何拉取消息的
在RocketMQ中,消息的消费单元是以队列来的
所以RocketMQ为了方便快速的查找和消费消息,会为每个Topic的每个队列也单独创建一个文件
RocketMQ给这个文件也起了一个高大上的名字:ConsumeQueue
当消息被存到CommitLog之后,其实还会往这条消息所在队列的ConsumeQueue文件中插一条数据
每个队列的ConsumeQueue也是由多个文件组成,每个文件默认是存30万条数据
插入ConsumeQueue中的每条数据由20个字节组成,包含3部分信息
- 消息在CommitLog的起始位置(8个字节)
- 消息在CommitLog存储的长度(8个字节)
- 消息tag的hashCode(4个字节)
每条数据也有自己的编号(offset),默认从0开始,依次递增
当消费者拉取消息的时候,会告诉服务端自己消费哪个队列(queueId),哪个位置的消息(offset)的消息
服务端接收到消息之后,会找到queueId对应的ConsumeQueue,然后找到offset位置的数据,最后根据这条数据到CommitLog文件查找真正的消息内容
所以,从这可以看出,ConsumeQueue其实就相当于是一个索引文件,方便我们快速查找在CommitLog中的消息
所以,记住下面这个非常重要的结论,有助于后面的文章内容的理解
要想查找到某个Topic下的消息,那么一定是先找这个Topic队列对应的ConsumeQueue,之后再通过ConsumeQueue中的数据去CommitLog文件查找真正的消息内容
消费者组和消费模式
在RocketMQ,消费者是有个消费者组的概念,在启动消费者的时候会指定该消费者属于哪个消费者组。
//创建一个消费者,指定消费者组的名称为sanyouConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");
一个消费者组中可以有多个消费者,不同消费者组之间消费消息是互不干扰的
在同一个消费者组中,消息消费有两种模式
- 集群模式
- 广播模式
同一条消息在同一个消费者组底下只会被消费一次,这就叫集群模式
集群消费的实现就是将队列按照一定的算法分配给消费者,默认是按照平均分配的
图片 广播模式刚好相反,同一条消息能被同一个消费者组底下所有的消费者消费一次
图片 RocketMQ默认是集群模式,如果你想用广播模式,只需设置一下即可
consumer.setMessageModel(MessageModel.BROADCASTING);
好了,到这就讲完了前置知识,这些前置知识后面或多或少都有提到
如果你觉得看的不过瘾,更详细的文章奉上RocketMQ消息短暂而又精彩的一生
普通消息
普通消息其实就很简单,如下面代码所示,就是发送一条普通的消息
public class Producer {
public static void main(String[] args) throws Exception {
//创建一个生产者,指定生产者组为 sanyouProducer
DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
// 指定NameServer的地址
producer.setNamesrvAddr("192.168.200.143:9876");
// 启动生产者
producer.start();
//创建一条消息 topic为 sanyouTopic 消息内容为 三友的java日记
Message msg = new Message("sanyouTopic", "三友的java日记".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息并得到消息的发送结果,然后打印
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
构建的消息的topic为sanyouTopic,内容为三友的java日记,这就是一条很普通的消息
批量消息
批量消息从名字也可以看出来,就是将多个消息同时发过去,减少网络请求的次数
public class Producer {
public static void main(String[] args) throws Exception {
//创建一个生产者,指定生产者组为 sanyouProducer
DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
// 指定NameServer的地址
producer.setNamesrvAddr("192.168.200.143:9876");
// 启动生产者
producer.start();
//用以及集合保存多个消息
List<Message> messages = new ArrayList<>();
messages.add(new Message("sanyouTopic", "三友的java日记 0".getBytes()));
messages.add(new Message("sanyouTopic", "三友的java日记 1".getBytes()));
messages.add(new Message("sanyouTopic", "三友的java日记 2".getBytes()));
// 发送消息并得到消息的发送结果,然后打印
SendResult sendResult = producer.send(messages);
System.out.printf("%s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
多个普通消息同时发送,这就是批量消息
不过在使用批量消息的时候,需要注意以下两点
- 每条消息的Topic必须都得是一样的
- 不支持延迟消息和事务消息
普通消息和批量消息比较简单,没有复杂的逻辑,就是将消息发送过去,在ConsumeQueue和CommitLog存上对应的数据就可以了
顺序消息
所谓的顺序消息就是指
生产者发送消息的顺序跟消费者消费消息的顺序是一致的
RocketMQ可以保证同一个队列的消息绝对顺序,先进入队列的消息会先被消费者拉取到,但是无法保证一个Topic内消息的绝对顺序
所以要想通过RocketMQ实现顺序消费,需要保证两点
- 生产者将需要保证顺序的消息发送到同一个队列
- 消费者按照顺序消费拉取到的消息
那么,第一个问题,如何消息发送到同一个队列
前面有提到,RocketMQ发送消息的时候会选择一个队列进行发送
而RocketMQ默认是通过轮询算法来选择队列的,这就无法保证需要顺序消费的消息会存到同一个队列底下
所以,默认情况下是不行了,我们需要自定义队列的选择算法,才能保证消息都在同一个队列中
RocketMQ提供了自定义队列选择的接口MessageQueueSelector
比如我们可以实现这个接口,保证相同订单id的消息都选择同一个队列,在消息发送的时候指定一下就可以了
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//可以根据业务的id从mqs中选择一个队列
return null;
}
}, new Object());
保证消息顺序发送之后,第二个问题,消费者怎么按照顺序消费拉取到的消息?
这个问题RocketMQ已经考虑到了,看看RocketMQ多么地贴心
RocketMQ在消费消息的时候,提供了两种方式:
- 并发消费
- 顺序消费
并发消费,多个线程同时处理同一个队列拉取到的消息
顺序消费,同一时间只有一个线程会处理同一个队列拉取到的消息
至于是并发消费还是顺序消费,需要我们自己去指定
对于顺序处理,只需要实现MessageListenerOrderly接口,处理消息就可以了
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");
// 指定NameServer的地址
consumer.setNamesrvAddr("192.168.200.143:9876");
// 订阅sanyouTopic这个topic下的所有的消息
consumer.subscribe("sanyouTopic", "*");
// 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
如果想并发消费,换成实现MessageListenerConcurrently即可
到这你可能会有一个疑问
并发消费和顺序消费跟前面提到的集群消费和广播消费有什么区别?
集群消费和广播消费指的是一个消费者组里的每个消费者是去拉取全部队列的消息还是部分队列的消息,也就是选择需要拉取的队列
而并发和顺序消费的意思是,是对已经拉到的同一个队列的消息,是并发处理还是按照消息的顺序去处理
延迟消息
延迟消息就是指生产者发送消息之后,消息不会立马被消费,而是等待一定的时间之后再被消息
RocketMQ的延迟消息用起来非常简单,只需要在创建消息的时候指定延迟级别,之后这条消息就成为延迟消息了
Message message = new Message("sanyouTopic", "三友的java日记 0".getBytes());
//延迟级别
message.setDelayTimeLevel(1);
虽然用起来简单,但是背后的实现原理还是有点意思,我们接着往下看
RocketMQ延迟消息的延迟时间默认有18个级别,不同的延迟级别对应的延迟时间不同
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
RocketMQ内部有一个Topic,专门用来表示是延迟消息 的,叫SCHEDULE_TOPIC_XXXX,XXXX不是占位符,就是XXXX
RocketMQ会根据延迟级别的个数为SCHEDULE_TOPIC_XXXX这个Topic创建相对应数量的队列
比如默认延迟级别是18,那么SCHEDULE_TOPIC_XXXX就有18个队列,队列的id从0开始,所以延迟级别为1时,对应的队列id就是0,为2时对应的就是1,依次类推
图片
那SCHEDULE_TOPIC_XXXX这个Topic有什么作用呢?
这就得从消息存储时的一波偷梁换柱的骚操作了说起了
当服务端接收到消息的时候,判断延迟级别大于0的时候,说明是延迟消息,此时会干下面三件事:
- 将消息的Topic改成SCHEDULE_TOPIC_XXXX
- 将消息的队列id设置为延迟级别对应的队列id
- 将消息真正的Topic和队列id存到前面提到的消息存储时的额外信息中
之后消息就按照正常存储的步骤存到CommitLog文件中
由于消息存到的是SCHEDULE_TOPIC_XXXX这个Topic中,而不是消息真正的目标Topic中,所以消费者此时是消费不到消息的
举个例子,比如有条消息,Topic为sanyou,所在的队列id = 1,延迟级别 = 1,那么偷梁换柱之后的结果如下图所示
代码如下
// init gueueld msg.getQueueldOr
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
//延迟消息的处理,将延迟消息放到中转的延迟队列中保存
if (msg.getDelayTimeLevel() > 0) { // 延迟消息
if (msg.getDelayTimelevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel())
}
// Topic设置为 RMQ_SYS_SCHEDULE_TOPIC
// 根据延迟级别找到对应的队列 id
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
int queueid = ScheduleessageService.delayLevel2QueueId(msg.getDelayTimeLevel())
// Backup real topic,queueId
// 将真正的 Topic和队列id存到额外信息中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC,msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.value0f(msg.getQueueId())
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())));
// 真正设置
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
所以从上分析可以得出一个结论
所有RocketMQ的延迟消息,最终都会存储到
SCHEDULE_TOPIC_XXXX这个Topic中,并且同一个延迟级别的消息在同一个队列中
在存消息偷梁换柱之后,实现延迟消费的最关键的一个步骤来了
BocketMQ在启动的时候,除了为每个延迟级别创建一个队列之后,还会为每个延迟级别创建一个延迟任务,也就相当于一个定时任务,每隔100ms执行一次
这个延迟任务会去检查这个队列中的消息有没有到达延迟时间,也就是不是可以消费了
前面的结论,每个队列都有一个ConsumeQueue文件,可以通过ConsumeQueue找到这个队列中的消息
一旦发现到达延迟时间,可以消费了,此时就会从这条消息额外存储的消息中拿到真正的Topic和队列id,重新构建一条新的消息,将新的消息的Topic和队列id设置成真正的Topic和队列id,内容还是原来消息的内容
之后再一次将新构建的消息存储到CommitLog中
由于新消息的Topic变成消息真正的Topic了,所以之后消费者就能够消费到这条消息了
所以,从整体来说,RocketMQ延迟消息的实现本质上就是最开始消息是存在SCHEDULE_TOPIC_XXXX这个中转的Topic中
然后会有一个类似定时任务的东西,不停地去找到这个Topic中的消息
一旦发现这个消息达到了延迟任务,说明可以消费了,那么就重新构建一条消息,这条消息的Topic和队列id都是实际上的Topic和队列id,然后存到CommitLog
之后消费者就能够在目标的Topic获取到消息了