跳到主要内容

消息顺序性

问题

如何保证消息的顺序消费?全局有序和局部有序有什么区别?不同消息队列如何实现顺序消息?

答案

为什么会乱序

全局有序 vs 局部有序

类型说明代价适用场景
全局有序所有消息严格按发送顺序消费只能单分区单消费者,吞吐量极低几乎不用
局部有序同一业务 key 的消息有序同 key 路由到同一分区即可绝大多数场景
面试核心结论

实际业务中需要的是 局部有序:同一订单的消息有序(创建→支付→发货),不同订单之间不需要有序。

各 MQ 的顺序消息方案

Kafka

Kafka 保证 Partition 内有序。同一 key 的消息发到同一 Partition 即可保证顺序。

Kafka 顺序消息
// Producer:指定 key,相同 key 路由到同一 Partition
producer.send(new ProducerRecord<>("order-topic", orderId, message));
// Kafka 默认分区策略:hash(key) % partitionCount

// 额外配置:防止重试导致乱序
props.put("enable.idempotence", true); // 开启幂等(自动保证单分区内有序)
// 或者
props.put("max.in.flight.requests.per.connection", 1); // 同时只有一个未确认请求

Consumer 端保证顺序:

  • 一个 Partition 只分配给一个 Consumer(Consumer Group 自动保证)
  • 如果消费端有多线程处理,需要按 key 分发到同一线程
Consumer 多线程有序消费
// 按业务 key hash 分发到固定线程
ExecutorService[] workers = new ExecutorService[threadCount];
for (ConsumerRecord<String, String> record : records) {
int index = Math.abs(record.key().hashCode()) % threadCount;
workers[index].submit(() -> processMessage(record));
}

RocketMQ

RocketMQ 原生支持顺序消息,提供 MessageQueueSelectorMessageListenerOrderly

RocketMQ 顺序消息
// Producer:相同 orderId 发到同一 MessageQueue
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderId);

// Consumer:使用 MessageListenerOrderly(单线程顺序消费每个 Queue)
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
processMessage(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
MessageListenerOrderly vs MessageListenerConcurrently
  • MessageListenerOrderly:每个 Queue 加锁,单线程顺序消费,保证顺序
  • MessageListenerConcurrently:多线程并发消费,不保证顺序但吞吐量高

RabbitMQ

RabbitMQ 单个 Queue 内天然有序(FIFO),保证顺序的关键是:

  1. 消息发到 同一个 Queue
  2. 该 Queue 只有 一个 Consumer(或 prefetchCount=1 串行处理)
RabbitMQ 顺序消费
// 设置 prefetchCount=1,一次只处理一条
channel.basicQos(1);

channel.basicConsume("order-queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
processMessage(body);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});

顺序消息方案对比

维度KafkaRocketMQRabbitMQ
有序粒度Partition 内MessageQueue 内Queue 内
实现方式key hash + 单 Partition 单 ConsumerMessageQueueSelector + OrderlyListener单 Queue 单 Consumer
消费失败处理手动处理自动挂起当前 Queue 重试Nack + requeue
吞吐量影响中等中等较大

常见面试问题

Q1: 如何保证消息的顺序消费?

答案

核心思路:同一业务 key 的消息路由到同一分区/队列,由同一消费者按序消费

具体方案:

  1. Producer 端:按业务 key(如 orderId)hash 取模,发到同一分区
  2. Broker 端:分区内消息天然有序(append-only)
  3. Consumer 端:一个分区只分配给一个 Consumer;如果消费端多线程,按 key 分发到固定线程

Q2: 顺序消费时消费失败怎么办?

答案

顺序消费场景下,消费失败不能简单跳过(否则后续消息依赖前一条的结果就会出错)。

处理策略:

  1. 阻塞重试:当前消息失败后持续重试,直到成功(RocketMQ OrderlyListener 的默认行为)
  2. 有限重试 + 告警:重试 N 次后跳过并告警,人工介入
  3. 暂存 + 补偿:失败消息存入错误表,后续消息继续消费,最后统一补偿

RocketMQ 的做法:消费失败时挂起当前 Queue,间隔一段时间后重试同一条消息。

Q3: 全局有序消息怎么实现?

答案

将 Topic 的分区数设为 1,所有消息进入同一分区且只有一个 Consumer 消费。

代价极大:完全丧失并行能力,吞吐量取决于单个 Consumer 的处理速度。实际生产中几乎不用全局有序,而是用局部有序(按业务 key 分区)满足需求。

Q4: Kafka 的重试会导致消息乱序吗?

答案

会。当 max.in.flight.requests.per.connection > 1 时,第一批消息发送失败需要重试,而第二批消息已经发送成功,就会导致乱序。

解决方案:

  • 推荐:开启幂等 enable.idempotence=true(Kafka 0.11+),即使 max.in.flight = 5 也能保证单分区内有序
  • 备选:将 max.in.flight.requests.per.connection 设为 1,但会降低吞吐量

相关链接