跳到主要内容

RocketMQ 核心原理

问题

RocketMQ 的架构是什么样的?有哪些特色功能?事务消息是如何实现的?

答案

RocketMQ 架构

核心概念

概念说明
NameServer轻量级注册中心,无状态,各节点互不通信
Broker消息存储和转发,Master 负责读写,Slave 只读备份
Topic消息的逻辑分类
MessageQueueTopic 的分片,类似 Kafka 的 Partition
Tag消息的二级分类,用于消费端过滤
Consumer Group消费者组,支持集群消费和广播消费
CommitLog所有消息顺序写入的物理文件(所有 Topic 混存)
ConsumeQueue消费队列索引,每个 MessageQueue 一个

存储模型

RocketMQ vs Kafka 存储差异
  • Kafka:每个 Partition 独立的 Log 文件,Topic 多时文件数多,随机写增多
  • RocketMQ:所有消息写入同一个 CommitLog,始终是顺序写;ConsumeQueue 只存索引信息

RocketMQ 的设计在 Topic 数量很多时(数千个),写入性能优于 Kafka。

ConsumeQueue 中每条索引固定 20 字节:offset(8B) + size(4B) + tagHashCode(8B)

事务消息

RocketMQ 的事务消息通过 半消息(Half Message)+ 事务回查 机制实现分布式事务的最终一致性。

事务消息示例
TransactionMQProducer producer = new TransactionMQProducer("tx-group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务(如扣减库存、更新订单)
orderService.createOrder(msg);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务回查:检查本地事务是否执行成功
boolean success = orderService.checkOrderExists(msg.getTransactionId());
return success ? LocalTransactionState.COMMIT_MESSAGE
: LocalTransactionState.ROLLBACK_MESSAGE;
}
});

// 发送事务消息
Message msg = new Message("order-topic", "创建订单".getBytes());
producer.sendMessageInTransaction(msg, null);

延迟消息

RocketMQ 原生支持延迟消息(开源版支持固定级别,商业版支持任意时间):

延迟消息
Message msg = new Message("topic", "body".getBytes());
// 延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3); // 延迟 10s
producer.send(msg);

RocketMQ 5.x 开始支持任意时间的定时消息。

消费模式

模式说明
集群消费(Clustering)同组 Consumer 分摊消费,每条消息只被一个 Consumer 消费(默认)
广播消费(Broadcasting)每条消息发给组内所有 Consumer

消息过滤

Tag 过滤
// Producer 发送时指定 Tag
Message msg = new Message("order-topic", "TagA", "body".getBytes());

// Consumer 订阅时过滤 Tag
consumer.subscribe("order-topic", "TagA || TagB"); // 只消费 TagA 和 TagB

RocketMQ 还支持 SQL92 过滤,根据消息属性进行复杂条件过滤:

consumer.subscribe("order-topic", 
MessageSelector.bySql("amount > 100 AND region = 'shanghai'"));

常见面试问题

Q1: RocketMQ 的事务消息是如何实现的?

答案

通过 半消息 + 事务回查 两阶段机制:

  1. Producer 发送半消息(Half Message)到 Broker,此时消息对消费者不可见
  2. Broker 存储半消息成功后,Producer 执行本地事务
  3. 根据本地事务结果,向 Broker 发送 Commit(投递)或 Rollback(删除)
  4. 如果 Broker 未收到二次确认(网络异常、Producer 崩溃),会定时回查 Producer 的本地事务状态
  5. 回查默认 15 次,超过次数自动 Rollback

半消息存储在内部的 RMQ_SYS_TRANS_HALF_TOPIC,Commit 后才转移到真正的 Topic。

Q2: NameServer 和 ZooKeeper 有什么区别?为什么不用 ZK?

答案

对比NameServerZooKeeper
复杂度极简,无状态功能丰富,有状态
一致性最终一致(节点间不通信)强一致(ZAB 协议)
可用性节点互相独立,单点故障不影响Leader 选举期间不可用
性能几乎无瓶颈写入性能受限

RocketMQ 不需要强一致的元数据管理,NameServer 更轻量、更可靠。

Q3: RocketMQ 如何保证消息不丢失?

答案

阶段方案
Producer → Broker同步发送 + 重试机制 + 事务消息
Broker 存储同步刷盘(flushDiskType=SYNC_FLUSH)+ 同步复制(brokerRole=SYNC_MASTER
Broker → Consumer手动 ACK + 重试队列(消费失败自动重试 16 次)

高可靠配置:同步发送 + 同步刷盘 + 同步复制 + 手动 ACK。代价是吞吐量下降。

Q4: RocketMQ 的消息重试和死信队列是怎么工作的?

答案

  • 消费重试:Consumer 返回 RECONSUME_LATER 或抛出异常时,消息进入 重试队列(%RETRY%ConsumerGroup)
  • 默认重试 16 次,间隔递增:10s → 30s → 1m → 2m → ... → 2h
  • 超过最大重试次数的消息进入 死信队列(%DLQ%ConsumerGroup)
  • 死信队列中的消息需要人工处理或单独消费

相关链接