RocketMQ 核心原理
问题
RocketMQ 的架构是什么样的?有哪些特色功能?事务消息是如何实现的?
答案
RocketMQ 架构
核心概念
| 概念 | 说明 |
|---|---|
| NameServer | 轻量级注册中心,无状态,各节点互不通信 |
| Broker | 消息存储和转发,Master 负责读写,Slave 只读备份 |
| Topic | 消息的逻辑分类 |
| MessageQueue | Topic 的分片,类似 Kafka 的 Partition |
| Tag | 消息的二级分类,用于消费端过滤 |
| Consumer Group | 消费者组,支持集群消费和广播消费 |
| CommitLog | 所有消息顺序写入的物理文件(所有 Topic 混存) |
| ConsumeQueue | 消费队列索引,每个 MessageQueue 一个 |
存储模型
RocketMQ vs Kafka 存储差异
- Kafka:每个 Partition 独立的 Log 文件,Topic 多时文件数多,随机写增多
- RocketMQ:所有消息写入同一个 CommitLog,始终是顺序写;ConsumeQueue 只存索引信息
RocketMQ 的设计在 Topic 数量很多时(数千个),写入性能优于 Kafka。
ConsumeQueue 中每条索引固定 20 字节:offset(8B) + size(4B) + tagHashCode(8B)。
事务消息
RocketMQ 的事务消息通过 半消息(Half Message)+ 事务回查 机制实现分布式事务的最终一致性。
事务消息示例
TransactionMQProducer producer = new TransactionMQProducer("tx-group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务(如扣减库存、更新订单)
orderService.createOrder(msg);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务回查:检查本地事务是否执行成功
boolean success = orderService.checkOrderExists(msg.getTransactionId());
return success ? LocalTransactionState.COMMIT_MESSAGE
: LocalTransactionState.ROLLBACK_MESSAGE;
}
});
// 发送事务消息
Message msg = new Message("order-topic", "创建订单".getBytes());
producer.sendMessageInTransaction(msg, null);
延迟消息
RocketMQ 原生支持延迟消息(开源版支持固定级别,商业版支持任意时间):
延迟消息
Message msg = new Message("topic", "body".getBytes());
// 延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3); // 延迟 10s
producer.send(msg);
RocketMQ 5.x 开始支持任意时间的定时消息。
消费模式
| 模式 | 说明 |
|---|---|
| 集群消费(Clustering) | 同组 Consumer 分摊消费,每条消息只被一个 Consumer 消费(默认) |
| 广播消费(Broadcasting) | 每条消息发给组内所有 Consumer |
消息过滤
Tag 过滤
// Producer 发送时指定 Tag
Message msg = new Message("order-topic", "TagA", "body".getBytes());
// Consumer 订阅时过滤 Tag
consumer.subscribe("order-topic", "TagA || TagB"); // 只消费 TagA 和 TagB
RocketMQ 还支持 SQL92 过滤,根据消息属性进行复杂条件过滤:
consumer.subscribe("order-topic",
MessageSelector.bySql("amount > 100 AND region = 'shanghai'"));
常见面试问题
Q1: RocketMQ 的事务消息是如何实现的?
答案:
通过 半消息 + 事务回查 两阶段机制:
- Producer 发送半消息(Half Message)到 Broker,此时消息对消费者不可见
- Broker 存储半消息成功后,Producer 执行本地事务
- 根据本地事务结果,向 Broker 发送 Commit(投递)或 Rollback(删除)
- 如果 Broker 未收到二次确认(网络异常、Producer 崩溃),会定时回查 Producer 的本地事务状态
- 回查默认 15 次,超过次数自动 Rollback
半消息存储在内部的 RMQ_SYS_TRANS_HALF_TOPIC,Commit 后才转移到真正的 Topic。
Q2: NameServer 和 ZooKeeper 有什么区别?为什么不用 ZK?
答案:
| 对比 | NameServer | ZooKeeper |
|---|---|---|
| 复杂度 | 极简,无状态 | 功能丰富,有状态 |
| 一致性 | 最终一致(节点间不通信) | 强一致(ZAB 协议) |
| 可用性 | 节点互相独立,单点故障不影响 | Leader 选举期间不可用 |
| 性能 | 几乎无瓶颈 | 写入性能受限 |
RocketMQ 不需要强一致的元数据管理,NameServer 更轻量、更可靠。
Q3: RocketMQ 如何保证消息不丢失?
答案:
| 阶段 | 方案 |
|---|---|
| Producer → Broker | 同步发送 + 重试机制 + 事务消息 |
| Broker 存储 | 同步刷盘(flushDiskType=SYNC_FLUSH)+ 同步复制(brokerRole=SYNC_MASTER) |
| Broker → Consumer | 手动 ACK + 重试队列(消费失败自动重试 16 次) |
高可靠配置:同步发送 + 同步刷盘 + 同步复制 + 手动 ACK。代价是吞吐量下降。
Q4: RocketMQ 的消息重试和死信队列是怎么工作的?
答案:
- 消费重试:Consumer 返回
RECONSUME_LATER或抛出异常时,消息进入 重试队列(%RETRY%ConsumerGroup) - 默认重试 16 次,间隔递增:10s → 30s → 1m → 2m → ... → 2h
- 超过最大重试次数的消息进入 死信队列(%DLQ%ConsumerGroup)
- 死信队列中的消息需要人工处理或单独消费