Administrator
Administrator
Published on 2026-06-09 / 10 Visits
0
0

Golang并发编程

Golang并发编程模型(一)

Worker Pool(工作池)

基本描述

多个goroutine并行处理任务,一个channel村任务,一个channel存结果

结构

任务 --> jobs channel
            ↓
     +------+------+------+
     |      |      |      |
 worker1 worker2 worker3
     |      |      |    
     +------+------+------+
            ↓
      results channel

典型场景

  • 批量处理图片
  • 并发下载文件
  • 并发调用 API
  • 日志分析
  • 批量数据库操作

代码骨架

jobs := make(chan Job)
results := make(chan Result)

for i := 0; i < 5; i++ {
    go worker(jobs, results)
}

这是最常见的模式


简单实现

任务池定义

// Result[T any] 任务执行结果定义
type Result[T any] struct {
	Value T
	Err   error
}

// Task[T any] 任务执行器
type Task[T any] func(context.Context) (T, error)

// Pool[T any] 任务池
type Pool[T any] struct {
	workers int
	jobs    chan Task[T]
	results chan Result[T]
	wg      sync.WaitGroup
}

创建任务池

// NewPool 创建worker pool
func NewPool[T any](workers int, queueSize int) *Pool[T] {
	return &Pool[T]{
		workers: workers,
		jobs:    make(chan Task[T], queueSize),
		results: make(chan Result[T], queueSize),
	}
}

启动任务池,核心执行逻辑

func (p *Pool[T]) Start(cxt context.Context) {
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)

		go func(id int) {
			defer p.wg.Done()
			for {
				select {
				case <-cxt.Done():
					return
				case task, ok := <-p.jobs:
					if !ok {
						return
					}
					value, err := task(cxt)
					p.results <- Result[T]{Value: value, Err: err}
				}
			}
		}(i)
	}

	// 所有worker退出后关闭results
	go func() {
		p.wg.Wait()
		close(p.results)
	}()
}

其他

func (p *Pool[T]) Submit(task Task[T]) {
	p.jobs <- task
}

func (p *Pool[T]) Close() {
	close(p.jobs)
}

func (p *Pool[T]) Results() <-chan Result[T] {
	return p.results
}

测试

func main() {
	cxt := context.Background()

	pool := goroutine.NewPool[int](4, 10)

	pool.Start(cxt)

	for i := 1; i <= 10; i++ {
		n := i
		pool.Submit(func(ctx context.Context) (int, error) {
			time.Sleep(time.Second)
			return n * n, nil
		})
	}

	pool.Close()

	for result := range pool.Results() {
		if result.Err != nil {
			fmt.Println(result.Err)
			continue
		}

		fmt.Println(result.Value)
	}
}

Comment