跳到主要内容

设计消息队列

问题

如何用 Go 设计或使用消息队列?理解消息队列在分布式系统中的核心作用。

答案

为什么需要消息队列

三大核心能力:异步解耦削峰

Go Channel 实现简易消息队列

// Topic: 一个 topic 对应多个消费者组
type Broker struct {
mu sync.RWMutex
topics map[string][]chan []byte
}

func NewBroker() *Broker {
return &Broker{topics: make(map[string][]chan []byte)}
}

// 订阅:为 topic 创建一个新的消费 channel
func (b *Broker) Subscribe(topic string, bufSize int) <-chan []byte {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan []byte, bufSize)
b.topics[topic] = append(b.topics[topic], ch)
return ch
}

// 发布:将消息广播给所有订阅者
func (b *Broker) Publish(topic string, msg []byte) {
b.mu.RLock()
defer b.mu.RUnlock()
for _, ch := range b.topics[topic] {
select {
case ch <- msg:
default:
// 消费者太慢,丢弃(或记录日志)
}
}
}

使用 Kafka(sarama)

// 生产者
func ProduceMessage(brokers []string, topic string) error {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
config.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return err
}
defer producer.Close()

msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("order-123"),
Value: sarama.StringEncoder(`{"order_id":"123","amount":99}`),
}
partition, offset, err := producer.SendMessage(msg)
log.Printf("partition=%d offset=%d", partition, offset)
return err
}

// 消费者组
func ConsumeGroup(brokers []string, group, topic string) error {
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
config.Consumer.Offsets.Initial = sarama.OffsetNewest

client, _ := sarama.NewConsumerGroup(brokers, group, config)
handler := &ConsumerHandler{}

// 循环消费(重平衡自动处理)
for {
if err := client.Consume(context.Background(), []string{topic}, handler); err != nil {
return err
}
}
}

type ConsumerHandler struct{}

func (h *ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (h *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.Printf("topic=%s partition=%d offset=%d value=%s",
msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 处理消息...
sess.MarkMessage(msg, "") // 标记已消费
}
return nil
}

使用 NATS(Go 生态轻量方案)

// NATS 更轻量,适合微服务内部通信
func NATSExample() {
nc, _ := nats.Connect(nats.DefaultURL)

// 订阅
nc.Subscribe("orders.*", func(msg *nats.Msg) {
log.Printf("Received: %s", string(msg.Data))
})

// 发布
nc.Publish("orders.created", []byte(`{"id":"123"}`))

// Request-Reply 模式
reply, _ := nc.Request("api.user.get", []byte("user-1"), 2*time.Second)
log.Printf("Reply: %s", string(reply.Data))
}

消息队列选型

特性KafkaNATSRabbitMQRedis Stream
吞吐量极高中高
持久化✅ 磁盘JetStream
顺序性Partition 内有序有序队列内有序有序
Go 生态sarama/franz-go官方 SDKamqp091-gogo-redis
适用场景大数据/日志微服务通信复杂路由轻量队列

可靠性保障

环节风险解决方案
生产端发送失败重试 + 本地消息表
队列宕机丢消息持久化 + 副本
消费端处理失败手动 ACK + 重试 + 死信队列
重复消费网络重传幂等性(唯一 ID + 去重表)

常见面试问题

Q1: 如何保证消息不丢失?

答案:三端保障:

  • 生产端:开启 ACK 确认(Kafka RequiredAcks=WaitForAll
  • Broker 端:持久化 + 多副本(Kafka ISR 机制)
  • 消费端:手动提交 Offset,处理完再 ACK

Q2: 如何保证消息顺序?

答案

  • Kafka:相同 Key 的消息发到同一个 Partition,单消费者消费
  • 全局有序:只设一个 Partition(牺牲吞吐量)
  • 局部有序:按业务 Key(如订单 ID)分区,同一订单消息有序

Q3: 消费积压怎么处理?

答案

  1. 增加消费者实例(Kafka 需增加 Partition)
  2. 消费者内部用 goroutine 并发处理
  3. 临时方案:新建临时 Topic,原消费者转发,扩大消费能力

Q4: Go Channel 和消息队列有什么区别?

答案:Channel 是进程内通信,无持久化、无跨服务能力。消息队列是分布式组件,支持持久化、多消费者、跨服务解耦。小规模可用 Channel + goroutine 模拟生产者消费者模式。

相关链接