跳到主要内容

消息积压处理

问题

线上 MQ 出现大量消息积压,消费速度跟不上生产速度,怎么处理?

答案

排查流程

应急处理:快速消化积压

方案 1:扩容消费者

Kafka 扩容消费者
// Kafka 消费者数 ≤ 分区数
// 如果分区数不够,需先扩分区
// 注意:Kafka 扩分区不可逆

// 临时增加消费者实例数(K8s 快速扩副本)
// kubectl scale deployment consumer-service --replicas=10

方案 2:转发到临时队列

临时紧急消费:转发到更多队列
// 快速消费者:只做转发,不做业务处理
@KafkaListener(topics = "order-topic")
public void emergencyConsume(String message) {
// 转发到 N 个临时队列
int partition = message.hashCode() % 10;
kafkaTemplate.send("temp-topic-" + partition, message);
}

// 10 个临时队列各有独立消费者,并行处理

方案 3:跳过非核心消息

降级处理:跳过过期消息
@KafkaListener(topics = "notification-topic")
public void consume(ConsumerRecord<String, String> record) {
// 消息超过 1 小时的通知已无意义,直接丢弃
if (System.currentTimeMillis() - record.timestamp() > 3600_000) {
log.warn("消息过期,跳过: offset={}", record.offset());
return;
}
processNotification(record.value());
}

消费者性能优化

批量消费提升吞吐
// Kafka 批量消费
@KafkaListener(topics = "order-topic")
public void batchConsume(List<ConsumerRecord<String, String>> records) {
// 批量处理 + 批量写 DB
List<Order> orders = records.stream()
.map(r -> parseOrder(r.value()))
.collect(Collectors.toList());

orderMapper.batchInsert(orders); // 批量 insert 比逐条快 10x+
}
Kafka 批量消费配置
spring:
kafka:
consumer:
max-poll-records: 500 # 一次拉取 500 条
fetch-min-size: 1048576 # 至少拉 1MB
fetch-max-wait: 500ms # 最多等 500ms
listener:
type: batch # 开启批量消费模式

事后复盘:防止再次积压

措施说明
监控告警积压量超阈值立即告警
消费者扩缩容能力支持快速水平扩展
生产端限流异常流量时限制生产速率
死信队列重试失败的消息进入死信,不阻塞正常消费
消息过期策略设置 TTL,过期消息自动丢弃

常见面试问题

Q1: 消息积压的常见原因?

答案

  • 消费者挂了 / 消费者数量不足
  • 消费者处理慢(DB 慢查询、下游服务超时、Full GC)
  • 生产端流量暴增(大促、异常流量)
  • 消费失败持续重试,卡住消费进度

Q2: Kafka 中消费者数量和分区数的关系?

答案

  • 同一消费者组中,消费者数量 ≤ 分区数
  • 多余的消费者会空闲(不分配分区)
  • 要提高并行度,需要先增加分区数
  • Kafka 增加分区后不可减少

详见 Kafka 核心原理

Q3: 消息消费失败怎么处理?

答案

  1. 重试:3-5 次重试,指数退避
  2. 死信队列:超过重试次数的消息放入死信队列
  3. 人工处理:监控死信队列,人工排查后重新投递
  4. 重要消息:入库记录状态,补偿任务定时扫描

详见 消息可靠性

相关链接