MQ-Usage-Suggestion
前言
最近有球友问我:MQ的使用场景有哪些?工作中一定要使用MQ吗?
记得刚工作那会儿,我总是想不明白:为什么明明直接调用接口就能完成的功能,非要引入MQ这么个"中间商"?
直到经历了系统崩溃、数据丢失、性能瓶颈等一系列问题后,我才真正理解了MQ的 价值。
今天我想和大家分享我在实际工作中使用消息队列(MQ)的10种典型场景,希望对你会有所帮助。
一、为什么需要消息队列(MQ)?
在深入具体场景之前,我们先来思考一个基本问题:为什么要使用消息队列?
系统间的直接调用:

引入消息队列后:

场景一:系统解耦
背景描述
在我早期参与的一个电商项目中,订单创建后需要通知多个系统:
// 早期的紧耦合设计
public class OrderService {
private InventoryService inventoryService;
private PointsService pointsService;
private EmailService emailService;
private AnalyticsService analyticsService;
public void createOrder(Order order) {
// 1. 保存订单
orderDao.save(order);
// 2. 调用库存服务
inventoryService.updateInventory(order);
// 3. 调用积分服务
pointsService.addPoints(order.getUserId(), order.getAmount());
// 4. 发送邮件通知
emailService.sendOrderConfirmation(order);
// 5. 记录分析数据
analyticsService.trackOrderCreated(order);
// 更多服务...
}
}
这种架构存在严重问题:
- 紧耦合 :订单服务需要知道所有下游服务
- 单点故障 :任何一个下游服务挂掉都会导致订单创建失败
- 性能瓶颈 :同步调用导致响应时间慢
MQ解决方案
引入MQ后,架构变为:

代码实现 :
// 订单服务 - 生产者
@Service
publicclass OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 1. 保存订单
orderDao.save(order);
// 2. 发送消息到MQ
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())
);
}
}
// 库存服务 - 消费者
@Component
@RabbitListener(queues = "inventory.queue")
publicclass InventoryConsumer {
@Autowired
private InventoryService inventoryService;
@RabbitHandler
public void handleOrderCreated(OrderCreatedEvent event) {
inventoryService.updateInventory(event.getOrderId());
}
}
技术要点
- 消息协议选择 :根据业务需求选择RabbitMQ、Kafka或RocketMQ
- 消息格式 :使用JSON或Protobuf等跨语言格式
- 错误处理 :实现重试机制和死信队列
场景二:异步处理
背景描述
用户上传视频后需要执行转码、生成缩略图、内容审核等耗时操作 ,如果同步处理,用户需要等待很长时间。
MQ解决方案
// 视频服务 - 生产者
@Service
publicclass VideoService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public UploadResponse uploadVideo(MultipartFile file, String userId) {
// 1. 保存原始视频
String videoId = saveOriginalVideo(file);
// 2. 发送处理消息
kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));
// 3. 立即返回响应
returnnew UploadResponse(videoId, "upload_success");
}
}
// 视频处理服务 - 消费者
@Service
publicclass VideoProcessingConsumer {
@KafkaListener(topics = "video-processing")
public void processVideo(VideoProcessingEvent event) {
// 异步执行耗时操作
videoProcessor.transcode(event.getVideoId());
videoProcessor.generateThumbnails(event.getVideoId());
contentModerationService.checkContent(event.getVideoId());
// 发送处理完成通知
notificationService.notifyUser(event.getUserId(), event.getVideoId());
}
}
架构优势
- 快速响应 :用户上传后立即得到响应
- 弹性扩展 :可以根据处理压力动态调整消费者数量
- 故障隔离 :处理服务故障不会影响上传功能