设计消息队列
问题
如何用 Go 设计或使用消息队列?理解消息队列在分布式系统中的核心作用。
答案
为什么需要消息队列
三大核心能力:异步、解耦、削峰。
Go Channel 实现简易消息队列
// Topic: 一个 topic 对应多个消费者组
type Broker struct {
mu sync.RWMutex
topics map[string][]chan []byte
}
func NewBroker() *Broker {
return &Broker{topics: make(map[string][]chan []byte)}
}
// 订阅:为 topic 创建一个新的消费 channel
func (b *Broker) Subscribe(topic string, bufSize int) <-chan []byte {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan []byte, bufSize)
b.topics[topic] = append(b.topics[topic], ch)
return ch
}
// 发布:将消息广播给所有订阅者
func (b *Broker) Publish(topic string, msg []byte) {
b.mu.RLock()
defer b.mu.RUnlock()
for _, ch := range b.topics[topic] {
select {
case ch <- msg:
default:
// 消费者太慢,丢弃(或记录日志)
}
}
}
使用 Kafka(sarama)
// 生产者
func ProduceMessage(brokers []string, topic string) error {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return err
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("order-123"),
Value: sarama.StringEncoder(`{"order_id":"123","amount":99}`),
}
partition, offset, err := producer.SendMessage(msg)
log.Printf("partition=%d offset=%d", partition, offset)
return err
}
// 消费者组
func ConsumeGroup(brokers []string, group, topic string) error {
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
config.Consumer.Offsets.Initial = sarama.OffsetNewest
client, _ := sarama.NewConsumerGroup(brokers, group, config)
handler := &ConsumerHandler{}
// 循环消费(重平衡自动处理)
for {
if err := client.Consume(context.Background(), []string{topic}, handler); err != nil {
return err
}
}
}
type ConsumerHandler struct{}
func (h *ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.Printf("topic=%s partition=%d offset=%d value=%s",
msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 处理消息...
sess.MarkMessage(msg, "") // 标记已消费
}
return nil
}
使用 NATS(Go 生态轻量方案)
// NATS 更轻量,适合微服务内部通信
func NATSExample() {
nc, _ := nats.Connect(nats.DefaultURL)
// 订阅
nc.Subscribe("orders.*", func(msg *nats.Msg) {
log.Printf("Received: %s", string(msg.Data))
})
// 发布
nc.Publish("orders.created", []byte(`{"id":"123"}`))
// Request-Reply 模式
reply, _ := nc.Request("api.user.get", []byte("user-1"), 2*time.Second)
log.Printf("Reply: %s", string(reply.Data))
}
消息队列选型
| 特性 | Kafka | NATS | RabbitMQ | Redis Stream |
|---|---|---|---|---|
| 吞吐量 | 极高 | 高 | 中 | 中高 |
| 持久化 | ✅ 磁盘 | JetStream | ✅ | ✅ |
| 顺序性 | Partition 内有序 | 有序 | 队列内有序 | 有序 |
| Go 生态 | sarama/franz-go | 官方 SDK | amqp091-go | go-redis |
| 适用场景 | 大数据/日志 | 微服务通信 | 复杂路由 | 轻量队列 |
可靠性保障
| 环节 | 风险 | 解决方案 |
|---|---|---|
| 生产端 | 发送失败 | 重试 + 本地消息表 |
| 队列 | 宕机丢消息 | 持久化 + 副本 |
| 消费端 | 处理失败 | 手动 ACK + 重试 + 死信队列 |
| 重复消费 | 网络重传 | 幂等性(唯一 ID + 去重表) |
常见面试问题
Q1: 如何保证消息不丢失?
答案:三端保障:
- 生产端:开启 ACK 确认(Kafka
RequiredAcks=WaitForAll) - Broker 端:持久化 + 多副本(Kafka ISR 机制)
- 消费端:手动提交 Offset,处理完再 ACK
Q2: 如何保证消息顺序?
答案:
- Kafka:相同 Key 的消息发到同一个 Partition,单消费者消费
- 全局有序:只设一个 Partition(牺牲吞吐量)
- 局部有序:按业务 Key(如订单 ID)分区,同一订单消息有序
Q3: 消费积压怎么处理?
答案:
- 增加消费者实例(Kafka 需增加 Partition)
- 消费者内部用 goroutine 并发处理
- 临时方案:新建临时 Topic,原消费者转发,扩大消费能力
Q4: Go Channel 和消息队列有什么区别?
答案:Channel 是进程内通信,无持久化、无跨服务能力。消息队列是分布式组件,支持持久化、多消费者、跨服务解耦。小规模可用 Channel + goroutine 模拟生产者消费者模式。