实现 Goroutine 池
问题
如何用 Go 实现一个 goroutine 池?为什么需要控制 goroutine 数量?
答案
为什么需要 Goroutine 池
虽然 goroutine 轻量(~2KB 栈),但无限创建仍有问题:
- 调度开销随数量增长
- 内存占用不可控
- 文件描述符/连接数可能耗尽
基础实现:Channel + Worker
type Pool struct {
taskQueue chan func()
wg sync.WaitGroup
}
func NewPool(workerCount int) *Pool {
p := &Pool{
taskQueue: make(chan func(), workerCount*2), // 缓冲队列
}
// 启动固定数量的 worker
for i := 0; i < workerCount; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.taskQueue {
task()
}
}()
}
return p
}
// 提交任务
func (p *Pool) Submit(task func()) {
p.taskQueue <- task
}
// 关闭池:等待所有任务完成
func (p *Pool) Shutdown() {
close(p.taskQueue)
p.wg.Wait()
}
// 使用
func main() {
pool := NewPool(10)
defer pool.Shutdown()
for i := 0; i < 100; i++ {
i := i
pool.Submit(func() {
fmt.Printf("处理任务 %d\n", i)
time.Sleep(100 * time.Millisecond)
})
}
}
进阶:带超时和错误收集
type TaskPool struct {
taskQueue chan func() error
results chan error
wg sync.WaitGroup
}
func NewTaskPool(workers, queueSize int) *TaskPool {
p := &TaskPool{
taskQueue: make(chan func() error, queueSize),
results: make(chan error, queueSize),
}
for i := 0; i < workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.taskQueue {
if err := task(); err != nil {
p.results <- err
}
}
}()
}
return p
}
// 带超时的提交
func (p *TaskPool) SubmitWithTimeout(task func() error, timeout time.Duration) error {
select {
case p.taskQueue <- task:
return nil
case <-time.After(timeout):
return fmt.Errorf("submit timeout")
}
}
func (p *TaskPool) Shutdown() {
close(p.taskQueue)
p.wg.Wait()
close(p.results)
}
func (p *TaskPool) Errors() <-chan error {
return p.results
}
使用 semaphore 控制并发
import "golang.org/x/sync/semaphore"
func ProcessWithSemaphore(items []string, maxConcurrency int64) {
sem := semaphore.NewWeighted(maxConcurrency)
var wg sync.WaitGroup
for _, item := range items {
wg.Add(1)
sem.Acquire(context.Background(), 1)
go func(item string) {
defer wg.Done()
defer sem.Release(1)
process(item)
}(item)
}
wg.Wait()
}
使用 errgroup
import "golang.org/x/sync/errgroup"
func ProcessWithErrgroup(items []string, maxConcurrency int) error {
g, _ := errgroup.WithContext(context.Background())
g.SetLimit(maxConcurrency)
for _, item := range items {
item := item
g.Go(func() error {
return process(item)
})
}
return g.Wait() // 返回第一个非 nil 错误
}
三方库 ants
import "github.com/panjf2000/ants/v2"
func UseAnts() {
pool, _ := ants.NewPoolWithFunc(10, func(payload interface{}) {
n := payload.(int)
fmt.Printf("处理: %d\n", n)
})
defer pool.Release()
for i := 0; i < 100; i++ {
pool.Invoke(i)
}
}
方案对比
| 方案 | 复杂度 | 特点 |
|---|---|---|
| Channel + Worker | 低 | 最基础,适合简单场景 |
| semaphore | 低 | 控制并发数,不复用 goroutine |
| errgroup | 低 | 自带错误处理和限流 |
| ants | 中 | 功能最全,goroutine 复用 |
实际项目推荐
简单并发控制用 errgroup.SetLimit(),复杂场景用 ants。手写 Pool 主要用于面试和理解原理。
常见面试问题
Q1: goroutine 泄漏怎么排查?
答案:
runtime.NumGoroutine()监控 goroutine 数量pprof的 goroutine profile 查看阻塞位置- 常见原因:channel 无消费者、忘记关闭 channel、Context 未取消
Q2: goroutine 池和直接创建 goroutine 的区别?
答案:池复用 goroutine,减少创建销毁开销,控制最大并发数。直接创建更简单但可能导致资源耗尽。Go 的 goroutine 足够轻量,中小规模直接创建即可,只有大量任务时才需要池化。