消息积压处理
问题
线上 MQ 出现大量消息积压,消费速度跟不上生产速度,怎么处理?
答案
排查流程
应急处理:快速消化积压
方案 1:扩容消费者
Kafka 扩容消费者
// Kafka 消费者数 ≤ 分区数
// 如果分区数不够,需先扩分区
// 注意:Kafka 扩分区不可逆
// 临时增加消费者实例数(K8s 快速扩副本)
// kubectl scale deployment consumer-service --replicas=10
方案 2:转发到临时队列
临时紧急消费:转发到更多队列
// 快速消费者:只做转发,不做业务处理
@KafkaListener(topics = "order-topic")
public void emergencyConsume(String message) {
// 转发到 N 个临时队列
int partition = message.hashCode() % 10;
kafkaTemplate.send("temp-topic-" + partition, message);
}
// 10 个临时队列各有独立消费者,并行处理
方案 3:跳过非核心消息
降级处理:跳过过期消息
@KafkaListener(topics = "notification-topic")
public void consume(ConsumerRecord<String, String> record) {
// 消息超过 1 小时的通知已无意义,直接丢弃
if (System.currentTimeMillis() - record.timestamp() > 3600_000) {
log.warn("消息过期,跳过: offset={}", record.offset());
return;
}
processNotification(record.value());
}
消费者性能优化
批量消费提升吞吐
// Kafka 批量消费
@KafkaListener(topics = "order-topic")
public void batchConsume(List<ConsumerRecord<String, String>> records) {
// 批量处理 + 批量写 DB
List<Order> orders = records.stream()
.map(r -> parseOrder(r.value()))
.collect(Collectors.toList());
orderMapper.batchInsert(orders); // 批量 insert 比逐条快 10x+
}
Kafka 批量消费配置
spring:
kafka:
consumer:
max-poll-records: 500 # 一次拉取 500 条
fetch-min-size: 1048576 # 至少拉 1MB
fetch-max-wait: 500ms # 最多等 500ms
listener:
type: batch # 开启批量消费模式
事后复盘:防止再次积压
| 措施 | 说明 |
|---|---|
| 监控告警 | 积压量超阈值立即告警 |
| 消费者扩缩容能力 | 支持快速水平扩展 |
| 生产端限流 | 异常流量时限制生产速率 |
| 死信队列 | 重试失败的消息进入死信,不阻塞正常消费 |
| 消息过期策略 | 设置 TTL,过期消息自动丢弃 |
常见面试问题
Q1: 消息积压的常见原因?
答案:
- 消费者挂了 / 消费者数量不足
- 消费者处理慢(DB 慢查询、下游服务超时、Full GC)
- 生产端流量暴增(大促、异常流量)
- 消费失败持续重试,卡住消费进度
Q2: Kafka 中消费者数量和分区数的关系?
答案:
- 同一消费者组中,消费者数量 ≤ 分区数
- 多余的消费者会空闲(不分配分区)
- 要提高并行度,需要先增加分区数
- Kafka 增加分区后不可减少
详见 Kafka 核心原理。
Q3: 消息消费失败怎么处理?
答案:
- 重试:3-5 次重试,指数退避
- 死信队列:超过重试次数的消息放入死信队列
- 人工处理:监控死信队列,人工排查后重新投递
- 重要消息:入库记录状态,补偿任务定时扫描
详见 消息可靠性。