跳到主要内容

RabbitMQ 核心原理

问题

RabbitMQ 的架构和核心概念是什么?Exchange 有几种类型?如何保证消息可靠投递?

答案

RabbitMQ 架构

核心概念

概念说明
Producer消息生产者,将消息发送到 Exchange
Exchange交换机,根据路由规则将消息分发到 Queue
Queue消息队列,存储消息,FIFO
BindingExchange 与 Queue 之间的绑定关系 + 路由键
Consumer消息消费者,从 Queue 中消费消息
Virtual Host虚拟主机,逻辑隔离(类似 MySQL 的 Database)
ConnectionTCP 长连接
Channel连接内的虚拟通道,复用 TCP 连接

Exchange 四种类型

类型路由规则适用场景
Direct精确匹配 Routing Key点对点、特定消费
Fanout广播到所有绑定的 Queue(忽略 Routing Key)广播通知
Topic通配符匹配 Routing Key(* 一个词、# 零或多个词)灵活路由
Headers匹配消息 Header 属性(较少使用)复杂条件路由

消息可靠投递

RabbitMQ 从 Producer 到 Consumer 的完整可靠链路:

① Producer Confirm(发布确认)

异步 Confirm
channel.confirmSelect();  // 开启 confirm 模式

channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
// Broker 已收到消息
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
// Broker 未收到,需要重发
}
});

channel.basicPublish(exchange, routingKey, props, body);

② Return 机制(消息路由失败回调)

// mandatory=true 时,消息无法路由到任何 Queue 会触发回调
channel.addReturnListener(returnMessage -> {
// 消息路由失败处理
log.warn("消息路由失败: {}", new String(returnMessage.getBody()));
});

channel.basicPublish(exchange, routingKey, true, props, body);

③ 持久化

// Exchange 持久化
channel.exchangeDeclare("my-exchange", "direct", true);

// Queue 持久化
channel.queueDeclare("my-queue", true, false, false, null);

// 消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 1=非持久化,2=持久化
.build();
channel.basicPublish(exchange, routingKey, props, body);

④ Consumer 手动 ACK

// 关闭自动确认
channel.basicConsume("my-queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
processMessage(body);
// 处理成功,手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝并重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
});

死信队列(DLX)

消息变成死信的三种情况:

  1. 消息被 basicNack/basicRejectrequeue=false
  2. 消息 TTL 过期
  3. Queue 达到最大长度
配置死信队列
// 声明死信交换机
channel.exchangeDeclare("dlx-exchange", "direct");
channel.queueDeclare("dlx-queue", true, false, false, null);
channel.queueBind("dlx-queue", "dlx-exchange", "dlx-routing-key");

// 业务队列绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx-exchange");
args.put("x-dead-letter-routing-key", "dlx-routing-key");
args.put("x-message-ttl", 60000); // 消息 TTL 60s
channel.queueDeclare("biz-queue", true, false, false, args);
延迟队列实现

RabbitMQ 没有原生延迟队列,通常通过 TTL + DLX 实现:

消息发到设置了 TTL 的队列 → 过期后自动转到死信队列 → 消费者从死信队列消费

也可使用 rabbitmq_delayed_message_exchange 插件实现精确延迟。


常见面试问题

Q1: RabbitMQ 如何保证消息不丢失?

答案

三个环节都需要保障:

环节方案
Producer → BrokerConfirm 机制 + mandatory 参数
Broker 存储Exchange、Queue、Message 三者持久化
Broker → Consumer手动 ACK,消费失败 Nack 重回队列
警告

持久化有性能开销。对性能敏感且允许少量丢失的场景(如日志采集),可以不开启持久化。

Q2: RabbitMQ 的 Channel 和 Connection 有什么区别?

答案

  • Connection 是 TCP 长连接,建立成本高
  • Channel 是 Connection 内的虚拟连接,轻量级,复用同一个 TCP 连接
  • 一个 Connection 可以创建多个 Channel
  • 通常一个线程对应一个 Channel(Channel 非线程安全)

这种设计避免了频繁创建 TCP 连接的开销,类似数据库连接池的思路。

Q3: 如何避免消息重复消费?

答案

RabbitMQ 不保证消息恰好消费一次(at-most-once 或 at-least-once),消费端需要做 幂等处理

  1. 唯一消息 ID:每条消息携带唯一 ID,消费前查 Redis/DB 判重
  2. 数据库唯一约束:利用业务唯一键(如订单号)防止重复插入
  3. 乐观锁/版本号:更新时带版本号条件

详见 消息幂等性

Q4: RabbitMQ 的集群和镜像队列是什么?

答案

  • 普通集群:元数据在所有节点同步,但队列数据只存在声明它的节点上。消费其他节点的队列需要转发
  • 镜像队列(Mirror Queue):队列数据在多个节点间复制,提供高可用。Master 宕机后 Slave 自动提升为 Master
  • Quorum Queue(3.8+推荐):基于 Raft 协议的队列,替代镜像队列,更可靠、更简单

相关链接