RabbitMQ 核心原理
问题
RabbitMQ 的架构和核心概念是什么?Exchange 有几种类型?如何保证消息可靠投递?
答案
RabbitMQ 架构
核心概念
| 概念 | 说明 |
|---|---|
| Producer | 消息生产者,将消息发送到 Exchange |
| Exchange | 交换机,根据路由规则将消息分发到 Queue |
| Queue | 消息队列,存储消息,FIFO |
| Binding | Exchange 与 Queue 之间的绑定关系 + 路由键 |
| Consumer | 消息消费者,从 Queue 中消费消息 |
| Virtual Host | 虚拟主机,逻辑隔离(类似 MySQL 的 Database) |
| Connection | TCP 长连接 |
| Channel | 连接内的虚拟通道,复用 TCP 连接 |
Exchange 四种类型
| 类型 | 路由规则 | 适用场景 |
|---|---|---|
| Direct | 精确匹配 Routing Key | 点对点、特定消费 |
| Fanout | 广播到所有绑定的 Queue(忽略 Routing Key) | 广播通知 |
| Topic | 通配符匹配 Routing Key(* 一个词、# 零或多个词) | 灵活路由 |
| Headers | 匹配消息 Header 属性(较少使用) | 复杂条件路由 |
消息可靠投递
RabbitMQ 从 Producer 到 Consumer 的完整可靠链路:
① Producer Confirm(发布确认):
异步 Confirm
channel.confirmSelect(); // 开启 confirm 模式
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
// Broker 已收到消息
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
// Broker 未收到,需要重发
}
});
channel.basicPublish(exchange, routingKey, props, body);
② Return 机制(消息路由失败回调):
// mandatory=true 时,消息无法路由到任何 Queue 会触发回调
channel.addReturnListener(returnMessage -> {
// 消息路由失败处理
log.warn("消息路由失败: {}", new String(returnMessage.getBody()));
});
channel.basicPublish(exchange, routingKey, true, props, body);
③ 持久化:
// Exchange 持久化
channel.exchangeDeclare("my-exchange", "direct", true);
// Queue 持久化
channel.queueDeclare("my-queue", true, false, false, null);
// 消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 1=非持久化,2=持久化
.build();
channel.basicPublish(exchange, routingKey, props, body);
④ Consumer 手动 ACK:
// 关闭自动确认
channel.basicConsume("my-queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
processMessage(body);
// 处理成功,手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝并重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
});
死信队列(DLX)
消息变成死信的三种情况:
- 消息被
basicNack/basicReject且requeue=false - 消息 TTL 过期
- Queue 达到最大长度
配置死信队列
// 声明死信交换机
channel.exchangeDeclare("dlx-exchange", "direct");
channel.queueDeclare("dlx-queue", true, false, false, null);
channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");
// 业务队列绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx-exchange");
args.put("x-dead-letter-routing-key", "dlx-routing-key");
args.put("x-message-ttl", 60000); // 消息 TTL 60s
channel.queueDeclare("biz-queue", true, false, false, args);
延迟队列实现
RabbitMQ 没有原生延迟队列,通常通过 TTL + DLX 实现:
消息发到设置了 TTL 的队列 → 过期后自动转到死信队列 → 消费者从死信队列消费
也可使用 rabbitmq_delayed_message_exchange 插件实现精确延迟。
常见面试问题
Q1: RabbitMQ 如何保证消息不丢失?
答案:
三个环节都需要保障:
| 环节 | 方案 |
|---|---|
| Producer → Broker | Confirm 机制 + mandatory 参数 |
| Broker 存储 | Exchange、Queue、Message 三者持久化 |
| Broker → Consumer | 手动 ACK,消费失败 Nack 重回队列 |
警告
持久化有性能开销。对性能敏感且允许少量丢失的场景(如日志采集),可以不开启持久化。
Q2: RabbitMQ 的 Channel 和 Connection 有什么区别?
答案:
- Connection 是 TCP 长连接,建立成本高
- Channel 是 Connection 内的虚拟连接,轻量级,复用同一个 TCP 连接
- 一个 Connection 可以创建多个 Channel
- 通常一个线程对应一个 Channel(Channel 非线程安全)
这种设计避免了频繁创建 TCP 连接的开销,类似数据库连接池的思路。
Q3: 如何避免消息重复消费?
答案:
RabbitMQ 不保证消息恰好消费一次(at-most-once 或 at-least-once),消费端需要做 幂等处理:
- 唯一消息 ID:每条消息携带唯一 ID,消费前查 Redis/DB 判重
- 数据库唯一约束:利用业务唯一键(如订单号)防止重复插入
- 乐观锁/版本号:更新时带版本号条件
详见 消息幂等性。
Q4: RabbitMQ 的集群和镜像队列是什么?
答案:
- 普通集群:元数据在所有节点同步,但队列数据只存在声明它的节点上。消费其他节点的队列需要转发
- 镜像队列(Mirror Queue):队列数据在多个节点间复制,提供高可用。Master 宕机后 Slave 自动提升为 Master
- Quorum Queue(3.8+推荐):基于 Raft 协议的队列,替代镜像队列,更可靠、更简单