RocketMQ-Message-Lifecycle
大家好,我是三友~~
这篇文章我准备来聊一聊RocketMQ消息的一生。
不知你是否跟我一样,在使用RocketMQ的时候也有很多的疑惑:
- 消息是如何发送的,队列是如何选择的?
- 消息是如何存储的,是如何保证读写的高性能?
- RocketMQ是如何实现消息的快速查找的?
- RocketMQ是如何实现高可用的?
- 消息是在什么时候会被清除?
- ...
本文就通 过探讨上述问题来探秘消息在RocketMQ中短暂而又精彩的一生。
如果你还没用过RocketMQ,可以看一下这篇文章RocketMQ保姆级教程
核心概念
-
NameServer:可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。
-
Broker:核心的一个角色,主要是用来保存消息的,在启动时会向NameServer进行注册。Broker实例可以有很多个,相同的BrokerName可以称为一个Broker组,每个Broker组只保存一部分消息。
-
topic:可以理解为一个消息的集合的名字,一个topic可以分布在不同的Broker组下。
-
队列(queue):一个topic可以有很多队列,默认是一个topic在同一个Broker组中是4个。如果一个topic现在在2个Broker组中,那么就有可能有8个队列。
-
生产者:生产消息的一方就是生产者
-
生产者组:一个生产者组可以有很多生产者,只需要在创建生产者的时候指定生产者组,那么这个生产者就在那个生产者组
-
消费者:用来消费生产者消息的一方
-
消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组可以有很多的消费者,不同的消费者组消费消息是互不影响的。
消息诞生与发送
我们都知道,消息是由业务系统在运行过程产生的,当我们的业务系统产生了消息,我们就可以调用RocketMQ提供的API向RocketMQ发送消息,就像下面这样
DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
//指定NameServer的地址
producer.setNamesrvAddr("localhost:9876");
//启动生产者
producer.start();
//省略代码。。
Message msg = new Message("sanyouTopic", "TagA", "三友的java日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息并得到消息的发送结果,然后打印
SendResult sendResult = producer.send(msg);
虽然代码很简单,我们不经意间可能会思考如下问题:
- 代码中只设置了
NameServer的地址,那么生产者是如何知道Broker所在机器的地址,然后向Broker发送消息的? - 一个topic会有很多队列,那么生产者是如何选择哪个队列发送消息?
- 消息一旦发送失败了怎么办?
路由表
当Broker在启动的过程中,Broker就会往NameServer注册自己这个Broker的信息,这些信息就包括自身所在服务器的ip和端口,还有就是自己这个Broker有哪些topic和对应的队列信息,这些信息就是路由信息,后面就统一称为路由表。
Broker向NameServer注册
当生产者启动的时候,会从NameServer中拉取到路由表,缓存到本地,同时会开启一个定时任务,默认是每隔30s从NameServer中重新拉取路由信息,更新本地缓存。
队列的选择
好了通过上一节我们就明白了,原来生产者会从NameServer拉取到Broker的路由表的信息,这样生产者就知道了topic对应的队列的信息了。
但是由于一个topic可能会有很多的队列,那么应该将消息发送到哪个队列上呢?
图片 面对这种情况,RocketMQ提供了两种消 息队列的选择算法。
- 轮询算法
- 最小投递延迟算法
轮询算法 就是一个队列一个队列发送消息,这些就能保证消息能够均匀分布在不同的队列底下,这也是RocketMQ默认的队列选择算法。
但是由于机器性能或者其它情况可能会出现某些Broker上的Queue可能投递延迟较严重,这样就会导致生产者不能及时发消息,造成生产者压力过大的问题。所以RocketMQ提供了最小投递延迟算法。
最小投递延迟算法 每次消息投递的时候会统计投递的时间延迟,在选择队列的时候会优先选择投递延迟时间小的队列。这种算法可能会导致消息分布不均匀的问题。
如果你想启用最小投递延迟算法,只需要按如下方法设置一下即可。
producer.setSendLatencyFaultEnable(true);
当然除了上述两种队列选择算法之外,你也可以自定义队列选择 算法,只需要实现MessageQueueSelector接口,在发送消息的时候指定即可。
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//从mqs中选择一个队列
return null;
}
}, new Object());
MessageQueueSelector RocketMQ也提供了三种实现
- 随机算法
- Hash算法