跳到主要内容

MQ-Usage-Suggestion

前言

最近有球友问我:MQ的使用场景有哪些?工作中一定要使用MQ吗?

记得刚工作那会儿,我总是想不明白:为什么明明直接调用接口就能完成的功能,非要引入MQ这么个"中间商"?

直到经历了系统崩溃、数据丢失、性能瓶颈等一系列问题后,我才真正理解了MQ的价值。

今天我想和大家分享我在实际工作中使用消息队列(MQ)的10种典型场景,希望对你会有所帮助。

一、为什么需要消息队列(MQ)?

在深入具体场景之前,我们先来思考一个基本问题:为什么要使用消息队列?

系统间的直接调用:

引入消息队列后:

场景一:系统解耦

背景描述

在我早期参与的一个电商项目中,订单创建后需要通知多个系统:

// 早期的紧耦合设计
public class OrderService {
    private InventoryService inventoryService;
    private PointsService pointsService;
    private EmailService emailService;
    private AnalyticsService analyticsService;
    
    public void createOrder(Order order) {
        // 1. 保存订单
        orderDao.save(order);
        
        // 2. 调用库存服务
        inventoryService.updateInventory(order);
        
        // 3. 调用积分服务
        pointsService.addPoints(order.getUserId(), order.getAmount());
        
        // 4. 发送邮件通知
        emailService.sendOrderConfirmation(order);
        
        // 5. 记录分析数据
        analyticsService.trackOrderCreated(order);
        
        // 更多服务...
    }
}

这种架构存在严重问题:

  • 紧耦合 :订单服务需要知道所有下游服务
  • 单点故障 :任何一个下游服务挂掉都会导致订单创建失败
  • 性能瓶颈 :同步调用导致响应时间慢

MQ解决方案

引入MQ后,架构变为:

代码实现 :

// 订单服务 - 生产者
@Service
publicclass OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void createOrder(Order order) {
        // 1. 保存订单
        orderDao.save(order);
        
        // 2. 发送消息到MQ
        rabbitTemplate.convertAndSend(
            "order.exchange",
            "order.created",
            new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())
        );
    }
}

// 库存服务 - 消费者
@Component
@RabbitListener(queues = "inventory.queue")
publicclass InventoryConsumer {
    @Autowired
    private InventoryService inventoryService;
    
    @RabbitHandler
    public void handleOrderCreated(OrderCreatedEvent event) {
        inventoryService.updateInventory(event.getOrderId());
    }
}

技术要点

  • 消息协议选择 :根据业务需求选择RabbitMQ、Kafka或RocketMQ
  • 消息格式 :使用JSON或Protobuf等跨语言格式
  • 错误处理 :实现重试机制和死信队列

场景二:异步处理

背景描述

用户上传视频后需要执行转码、生成缩略图、内容审核等耗时操作,如果同步处理,用户需要等待很长时间。

MQ解决方案

// 视频服务 - 生产者
@Service
publicclass VideoService {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    public UploadResponse uploadVideo(MultipartFile file, String userId) {
        // 1. 保存原始视频
        String videoId = saveOriginalVideo(file);
        
        // 2. 发送处理消息
        kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));
        
        // 3. 立即返回响应
        returnnew UploadResponse(videoId, "upload_success");
    }
}

// 视频处理服务 - 消费者
@Service
publicclass VideoProcessingConsumer {
    @KafkaListener(topics = "video-processing")
    public void processVideo(VideoProcessingEvent event) {
        // 异步执行耗时操作
        videoProcessor.transcode(event.getVideoId());
        videoProcessor.generateThumbnails(event.getVideoId());
        contentModerationService.checkContent(event.getVideoId());
        
        // 发送处理完成通知
        notificationService.notifyUser(event.getUserId(), event.getVideoId());
    }
}

架构优势

  • 快速响应 :用户上传后立即得到响应
  • 弹性扩展 :可以根据处理压力动态调整消费者数量
  • 故障隔离 :处理服务故障不会影响上传功能

场景三:流量削峰

背景描述

电商秒杀活动时,瞬时流量可能是平时的百倍以上,直接冲击数据库和服务。

MQ解决方案

代码实现 :

// 秒杀服务
@Service
publicclass SecKillService {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public SecKillResponse secKill(SecKillRequest request) {
        // 1. 校验用户资格
        if (!checkUserQualification(request.getUserId())) {
            return SecKillResponse.failed("用户无资格");
        }
        
        // 2. 预减库存(Redis原子操作)
        Long remaining = redisTemplate.opsForValue().decrement(
            "sec_kill_stock:" + request.getItemId());
        
        if (remaining == null || remaining < 0) {
            // 库存不足,恢复库存
            redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());
            return SecKillResponse.failed("库存不足");
        }
        
        // 3. 发送秒杀成功消息到MQ
        rabbitTemplate.convertAndSend(
            "sec_kill.exchange",
            "sec_kill.success",
            new SecKillSuccessEvent(request.getUserId(), request.getItemId())
        );
        
        return SecKillResponse.success("秒杀成功");
    }
}

// 订单处理消费者
@Component
@RabbitListener(queues = "sec_kill.order.queue")
publicclass SecKillOrderConsumer {
    @RabbitHandler
    public void handleSecKillSuccess(SecKillSuccessEvent event) {
        // 异步创建订单
        orderService.createSecKillOrder(event.getUserId(), event.getItemId());
    }
}

技术要点

  • 库存预扣 :使用Redis原子操作避免超卖
  • 队列缓冲 :MQ缓冲请求,避免直接冲击数据库
  • 限流控制 :在网关层进行限流,拒绝过多请求

场景四:数据同步

背景描述

在微服务架构中,不同服务有自己的数据库,需要保证数据一致性。

MQ解决方案

// 用户服务 - 数据变更时发送消息
@Service
publicclass UserService {
    @Transactional
    public User updateUser(User user) {
        // 1. 更新数据库
        userDao.update(user);
        
        // 2. 发送消息(在事务内)
        rocketMQTemplate.sendMessageInTransaction(
            "user-update-topic",
            MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus()))
                .build(),
            null
        );
        
        return user;
    }
}

// 其他服务 - 消费用户更新消息
@Service
@RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group")
publicclass UserUpdateConsumer implements RocketMQListener<UserUpdateEvent> {
    @Override
    public void onMessage(UserUpdateEvent event) {
        // 更新本地用户信息缓存
        orderService.updateUserCache(event.getUserId(), event.getStatus());
    }
}

一致性保证

  • 本地事务表 :将消息和业务数据放在同一个数据库事务中
  • 事务消息 :使用RocketMQ的事务消息机制
  • 幂等消费 :消费者实现幂等性,避免重复处理

场景五:日志收集

背景描述

分布式系统中,日志分散在各个节点,需要集中收集和分析。

MQ解决方案

代码实现 :

// 日志收集组件
@Component
publicclass LogCollector {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void collectLog(String appId, String level, String message, Map<String, Object> context) {
        LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());
        
        // 发送到Kafka
        kafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));
    }
}

// 日志消费者
@Service
publicclass LogConsumer {
    @KafkaListener(topics = "app-logs", groupId = "log-es")
    public void consumeLog(String message) {
        LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);
        
        // 存储到Elasticsearch
        elasticsearchService.indexLog(logEvent);
        
        // 实时监控检查
        if ("ERROR".equals(logEvent.getLevel())) {
            alertService.checkAndAlert(logEvent);
        }
    }
}

技术优势

  • 解耦 :应用节点无需关心日志如何处理
  • 缓冲 :应对日志产生速率波动
  • 多消费 :同一份日志可以被多个消费者处理

场景六:消息广播

背景描述

系统配置更新后,需要通知所有服务节点更新本地配置。

MQ解决方案

// 配置服务 - 广播配置更新
@Service
publicclass ConfigService {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void updateConfig(String configKey, String configValue) {
        // 1. 更新配置存储
        configDao.updateConfig(configKey, configValue);
        
        // 2. 广播配置更新消息
        redisTemplate.convertAndSend("config-update-channel", 
            new ConfigUpdateEvent(configKey, configValue));
    }
}

// 服务节点 - 订阅配置更新
@Component
publicclass ConfigUpdateListener {
    @Autowired
    private LocalConfigCache localConfigCache;
    
    @RedisListener(channel = "config-update-channel")
    public void handleConfigUpdate(ConfigUpdateEvent event) {
        // 更新本地配置缓存
        localConfigCache.updateConfig(event.getKey(), event.getValue());
    }
}

应用场景

  • 功能开关 :动态开启或关闭功能
  • 参数调整 :调整超时时间、限流阈值等
  • 黑白名单 :更新黑白名单配置

场景七:顺序消息

背景描述

在某些业务场景中,消息的处理顺序很重要,如订单状态变更。

MQ解决方案

// 订单状态变更服务
@Service
publicclass OrderStateService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void changeOrderState(String orderId, String oldState, String newState) {
        OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);
        
        // 发送顺序消息,使用orderId作为sharding key
        rocketMQTemplate.syncSendOrderly(
            "order-state-topic", 
            event, 
            orderId  // 保证同一订单的消息按顺序处理
        );
    }
}

// 订单状态消费者
@Service
@RocketMQMessageListener(
    topic = "order-state-topic",
    consumerGroup = "order-state-group",
    consumeMode = ConsumeMode.ORDERLY  // 顺序消费
)
publicclass OrderStateConsumer implements RocketMQListener<OrderStateEvent> {
    @Override
    public void onMessage(OrderStateEvent event) {
        // 按顺序处理订单状态变更
        orderService.processStateChange(event);
    }
}

顺序保证机制

  • 分区顺序 :同一分区内的消息保证顺序
  • 顺序投递 :MQ保证消息按发送顺序投递
  • 顺序处理 :消费者顺序处理消息

场景八:延迟消息

背景描述

需要实现定时任务,如订单超时未支付自动取消。

MQ解决方案

// 订单服务 - 发送延迟消息
@Service
publicclass OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void createOrder(Order order) {
        // 保存订单
        orderDao.save(order);
        
        // 发送延迟消息,30分钟后检查支付状态
        rabbitTemplate.convertAndSend(
            "order.delay.exchange",
            "order.create",
            new OrderCreateEvent(order.getId()),
            message -> {
                message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分钟
                return message;
            }
        );
    }
}

// 订单超时检查消费者
@Component
@RabbitListener(queues = "order.delay.queue")
publicclass OrderTimeoutConsumer {
    @RabbitHandler
    public void checkOrderPayment(OrderCreateEvent event) {
        Order order = orderDao.findById(event.getOrderId());
        if ("UNPAID".equals(order.getStatus())) {
            // 超时未支付,取消订单
            orderService.cancelOrder(order.getId(), "超时未支付");
        }
    }
}

替代方案对比

方案优点缺点
数据库轮询实现简单实时性差,数据库压力大
延时队列实时性好实现复杂,消息堆积问题
定时任务可控性强分布式协调复杂

场景九:消息重试

背景描述

处理消息时可能遇到临时故障,需要重试机制保证最终处理成功。MQ解决方案

// 消息消费者 with 重试机制
@Service
@Slf4j
publicclass RetryableConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @RabbitListener(queues = "business.queue")
    public void processMessage(Message message, Channel channel) {
        try {
            // 业务处理
            businessService.process(message);
            
            // 确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            
        } catch (TemporaryException e) {
            // 临时异常,重试
            log.warn("处理失败,准备重试", e);
            
            // 拒绝消息,requeue=true
            channel.basicNack(
                message.getMessageProperties().getDeliveryTag(),
                false,
                true// 重新入队
            );
            
        } catch (PermanentException e) {
            // 永久异常,进入死信队列
            log.error("处理失败,进入死信队列", e);
            
            channel.basicNack(
                message.getMessageProperties().getDeliveryTag(),
                false,
                false// 不重新入队
            );
        }
    }
}

重试策略

  • 立即重试 :临时故障立即重试
  • 延迟重试 :逐步增加重试间隔
  • 死信队列 :最终无法处理的消息进入死信队列

场景十:事务消息

背景描述

分布式系统中,需要保证多个服务的数据一致性。MQ解决方案

// 事务消息生产者
@Service
publicclass TransactionalMessageService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Transactional
    public void createOrderWithTransaction(Order order) {
        // 1. 保存订单(数据库事务)
        orderDao.save(order);
        
        // 2. 发送事务消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            "order-tx-topic",
            MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))
                .build(),
            order  // 事务参数
        );
        
        if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
            thrownew RuntimeException("事务消息发送失败");
        }
    }
}

// 事务消息监听器
@Component
@RocketMQTransactionListener
publicclass OrderTransactionListener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderDao orderDao;
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 检查本地事务状态
            Order order = (Order) arg;
            Order existOrder = orderDao.findById(order.getId());
            
            if (existOrder != null && "CREATED".equals(existOrder.getStatus())) {
                return RocketMQLocalTransactionState.COMMIT_MESSAGE;
            } else {
                return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 回查本地事务状态
        String orderId = (String) msg.getHeaders().get("order_id");
        Order order = orderDao.findById(orderId);
        
        if (order != null && "CREATED".equals(order.getStatus())) {
            return RocketMQLocalTransactionState.COMMIT_MESSAGE;
        } else {
            return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

事务消息流程

总结

通过以上10个场景,我们可以总结出MQ使用的核心原则:

适用场景

  • 异步处理 :提升系统响应速度
  • 系统解耦 :降低系统间依赖
  • 流量削峰 :应对突发流量
  • 数据同步 :保证最终一致性
  • 分布式事务 :解决数据一致性问题

技术选型建议

场景推荐MQ原因
高吞吐Kafka高吞吐量,持久化存储
事务消息RocketMQ完整的事务消息机制
复杂路由RabbitMQ灵活的路由配置
延迟消息RabbitMQ原生支持延迟队列

最佳实践

  1. 消息幂等性 :消费者必须实现幂等处理
  2. 死信队列 :处理失败的消息要有兜底方案
  3. 监控告警 :完善的消息堆积监控和告警
  4. 性能优化 :根据业务特点调整MQ参数