Golang并发编程模型(一)
Worker Pool(工作池)
基本描述
多个goroutine并行处理任务,一个channel村任务,一个channel存结果
结构
任务 --> jobs channel
↓
+------+------+------+
| | | |
worker1 worker2 worker3
| | |
+------+------+------+
↓
results channel
典型场景
- 批量处理图片
- 并发下载文件
- 并发调用 API
- 日志分析
- 批量数据库操作
代码骨架
jobs := make(chan Job)
results := make(chan Result)
for i := 0; i < 5; i++ {
go worker(jobs, results)
}
这是最常见的模式
简单实现
任务池定义
// Result[T any] 任务执行结果定义
type Result[T any] struct {
Value T
Err error
}
// Task[T any] 任务执行器
type Task[T any] func(context.Context) (T, error)
// Pool[T any] 任务池
type Pool[T any] struct {
workers int
jobs chan Task[T]
results chan Result[T]
wg sync.WaitGroup
}
创建任务池
// NewPool 创建worker pool
func NewPool[T any](workers int, queueSize int) *Pool[T] {
return &Pool[T]{
workers: workers,
jobs: make(chan Task[T], queueSize),
results: make(chan Result[T], queueSize),
}
}
启动任务池,核心执行逻辑
func (p *Pool[T]) Start(cxt context.Context) {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(id int) {
defer p.wg.Done()
for {
select {
case <-cxt.Done():
return
case task, ok := <-p.jobs:
if !ok {
return
}
value, err := task(cxt)
p.results <- Result[T]{Value: value, Err: err}
}
}
}(i)
}
// 所有worker退出后关闭results
go func() {
p.wg.Wait()
close(p.results)
}()
}
其他
func (p *Pool[T]) Submit(task Task[T]) {
p.jobs <- task
}
func (p *Pool[T]) Close() {
close(p.jobs)
}
func (p *Pool[T]) Results() <-chan Result[T] {
return p.results
}
测试
func main() {
cxt := context.Background()
pool := goroutine.NewPool[int](4, 10)
pool.Start(cxt)
for i := 1; i <= 10; i++ {
n := i
pool.Submit(func(ctx context.Context) (int, error) {
time.Sleep(time.Second)
return n * n, nil
})
}
pool.Close()
for result := range pool.Results() {
if result.Err != nil {
fmt.Println(result.Err)
continue
}
fmt.Println(result.Value)
}
}