跳到主要内容

实现生产者消费者模式

问题

如何用 Go 实现生产者消费者模式?

答案

基础实现:Channel

Go 的 Channel 天然就是生产者消费者模式的最佳实现:

func BasicProducerConsumer() {
tasks := make(chan int, 100) // 缓冲 channel 作为队列
var wg sync.WaitGroup

// 启动 3 个消费者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for task := range tasks {
fmt.Printf("消费者 %d 处理任务 %d\n", id, task)
time.Sleep(100 * time.Millisecond)
}
}(i)
}

// 生产者
for i := 0; i < 20; i++ {
tasks <- i
}
close(tasks) // 关闭 channel,消费者 range 结束

wg.Wait()
fmt.Println("所有任务处理完成")
}

多生产者 + 多消费者

func MultiProducerMultiConsumer() {
tasks := make(chan string, 50)
results := make(chan string, 50)
var prodWg, consWg sync.WaitGroup

// 多个生产者
for i := 0; i < 3; i++ {
prodWg.Add(1)
go func(id int) {
defer prodWg.Done()
for j := 0; j < 10; j++ {
task := fmt.Sprintf("producer-%d-task-%d", id, j)
tasks <- task
}
}(i)
}

// 多个消费者
for i := 0; i < 5; i++ {
consWg.Add(1)
go func(id int) {
defer consWg.Done()
for task := range tasks {
result := process(task)
results <- result
}
}(i)
}

// 生产者完成后关闭 tasks
go func() {
prodWg.Wait()
close(tasks)
}()

// 消费者完成后关闭 results
go func() {
consWg.Wait()
close(results)
}()

// 收集结果
for result := range results {
fmt.Println(result)
}
}

带背压控制

// 当消费者处理不过来时,生产者自动减速
func ProducerWithBackpressure() {
// 小缓冲 = 自然背压
tasks := make(chan Task, 10) // 缓冲满时,生产者阻塞

// 生产者
go func() {
defer close(tasks)
for i := 0; ; i++ {
task := generateTask(i)
select {
case tasks <- task:
// 正常发送
default:
// 队列满,可以选择丢弃、记录或等待
log.Printf("队列满,等待中...")
tasks <- task // 阻塞等待
}
}
}()

// 消费者
for task := range tasks {
processTask(task)
}
}

Fan-Out / Fan-In 模式

// Fan-Out: 一个输入分发给多个 worker
// Fan-In: 多个 worker 结果合并到一个 channel
func FanOutFanIn(input <-chan int, workerCount int) <-chan int {
// Fan-Out
workers := make([]<-chan int, workerCount)
for i := 0; i < workerCount; i++ {
workers[i] = worker(input)
}

// Fan-In
return merge(workers...)
}

func worker(input <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range input {
out <- n * n // 处理:平方
}
}()
return out
}

func merge(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup

for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}

go func() {
wg.Wait()
close(out)
}()

return out
}

Pipeline 模式

// 多阶段管道处理
func Pipeline() {
// Stage 1: 生成数据
nums := generate(1, 2, 3, 4, 5)
// Stage 2: 平方
squared := stage(nums, func(n int) int { return n * n })
// Stage 3: +10
result := stage(squared, func(n int) int { return n + 10 })

for v := range result {
fmt.Println(v) // 11, 14, 19, 26, 35
}
}

func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}

func stage(in <-chan int, fn func(int) int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- fn(n)
}
}()
return out
}

常见面试问题

Q1: Channel 缓冲大小怎么设?

答案

  • 无缓冲(0):严格同步,生产者等消费者取走才继续
  • 小缓冲(10~100):平衡吞吐和内存,有自然背压
  • 大缓冲(1000+):允许突发生产,但要监控积压

Q2: 如何优雅地停止生产者消费者?

答案

  • context.Context 通知所有 goroutine 退出
  • 生产者退出后 close(channel)
  • 消费者用 for range 自动在 channel 关闭时退出

相关链接