并发模式
Go 语言以其简洁的并发原语(goroutine + channel)著称,但如何将它们组合成可靠、高效的并发程序,则需要掌握一些经典的并发模式。这些模式是从大量实践中提炼出来的通用解决方案,能够帮助开发者避免常见的并发陷阱(竞态条件、goroutine 泄漏、死锁等),并构建出结构清晰、易于维护的并发系统。
本文将系统讲解 Go 中最常用的七种并发模式,每种模式都配有完整的代码示例。
Worker Pool(工作池模式)
工作池是一种限制并发 goroutine 数量的模式。系统预先创建固定数量的 worker goroutine,它们从一个共享的任务 channel 中读取任务并执行。当所有 worker 都忙时,新任务会在 channel 中排队等待。
Worker Pool 的核心思想是资源复用——避免为每个任务都创建新的 goroutine,从而防止系统因 goroutine 数量爆炸而导致内存溢出或 CPU 上下文切换过于频繁。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d 开始处理任务 %d\n", id, job)
time.Sleep(time.Millisecond * 500) // 模拟工作耗时
results <- job * 2
fmt.Printf("Worker %d 完成任务 %d,结果: %d\n", id, job, job*2)
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// 启动固定数量的 worker
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // 关闭任务 channel,worker 处理完后会自动退出 range 循环
// 启动一个 goroutine 等待所有 worker 完成后关闭 results channel
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("收到结果: %d\n", result)
}
fmt.Println("所有任务处理完毕")
}工作池模式要点
- 固定 worker 数量:通常设置为
runtime.NumCPU()或根据任务类型手动调整 - 带缓冲的 channel:任务 channel 设置缓冲区大小,避免发送方阻塞
- 优雅关闭:关闭 jobs channel 后,worker 的
range循环会自动退出;使用sync.WaitGroup确保所有 worker 完成后再关闭 results channel - 适用场景:HTTP 请求处理、数据库批量操作、文件并行处理等
Pipeline(流水线模式)
流水线模式将数据处理过程拆分为多个独立的阶段,每个阶段由一个或多个 goroutine 负责。阶段之间通过 channel 连接,前一个阶段的输出作为后一个阶段的输入,数据像流水线一样依次流经各阶段。
Pipeline 的核心优势是关注点分离——每个阶段只需要关注自己的处理逻辑,阶段之间通过 channel 解耦。这使得代码更易理解、测试和扩展。
package main
import (
"fmt"
)
// 阶段1:生成器——产生整数序列
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段2:平方运算
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 阶段3:过滤奇数
func filterOdd(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if n%2 == 0 {
out <- n
}
}
close(out)
}()
return out
}
func main() {
// 构建流水线:生成 → 平方 → 过滤奇数
nums := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(nums)
filtered := filterOdd(squared)
// 消费最终结果
for result := range filtered {
fmt.Printf("结果: %d\n", result)
}
// 输出: 4, 16, 36, 64, 100
}流水线模式要点
- 单向 channel:每个阶段的输入输出 channel 使用
<-chan或chan<-明确方向,增强类型安全 - 自动传播关闭:当上游 channel 关闭且数据全部读取完毕后,当前阶段处理完毕并关闭自己的输出 channel
- 背压处理:无缓冲 channel 天然提供背压——下游处理慢时,上游会自动阻塞等待
- 适用场景:数据 ETL 流程、图像处理管道、日志分析等
Fan-In / Fan-Out(扇入扇出模式)
- Fan-Out(扇出):将一个输入 channel 的数据分发到多个 worker goroutine 并行处理
- Fan-In(扇入):将多个 goroutine 的输出 channel 合并为一个统一的输出 channel
Fan-In/Fan-Out 是 Pipeline 模式的扩展。当流水线中某个阶段计算成本较高时,可以通过 Fan-Out 并行处理来加速;再通过 Fan-In 将结果汇聚,供下游消费。
package main
import (
"fmt"
"sync"
"time"
)
// 模拟耗时计算
func heavyCompute(n int) int {
time.Sleep(time.Millisecond * 200)
return n * n
}
// Fan-Out: 从输入 channel 读取并分发给多个 worker
func fanOut(in <-chan int, numWorkers int) []<-chan int {
var workers []<-chan int
for i := 0; i < numWorkers; i++ {
ch := make(chan int)
workers = append(workers, ch)
go func(workerCh chan<- int) {
defer close(workerCh)
for n := range in {
result := heavyCompute(n)
workerCh <- result
}
}(ch)
}
return workers
}
// Fan-In: 将多个 channel 合并为一个
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个输入 channel 启动一个 goroutine
multiplexer := func(ch <-chan int) {
defer wg.Done()
for v := range ch {
out <- v
}
}
wg.Add(len(channels))
for _, ch := range channels {
go multiplexer(ch)
}
// 所有输入 channel 关闭后,关闭输出 channel
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// 生成数据
in := make(chan int)
go func() {
defer close(in)
for i := 1; i <= 10; i++ {
in <- i
}
}()
// Fan-Out: 启动 4 个 worker 并行处理
workers := fanOut(in, 4)
// Fan-In: 合并所有 worker 的结果
results := fanIn(workers...)
// 消费结果
for result := range results {
fmt.Printf("处理结果: %d\n", result)
}
}扇入扇出模式要点
- 并行加速:Fan-Out 的 worker 数量应根据任务类型调整——CPU 密集型任务适合
GOMAXPROCS,IO 密集型可以更多 - 结果无序:多个 worker 并行处理,结果到达顺序与输入顺序不一定一致。如需保序,需要额外的排序或编号机制
- 适用场景:并行网络爬虫、批量 API 调用、分布式任务处理
Future / Promise 模式
Future/Promise 模式是一种异步编程范式。发起一个异步操作后立即获得一个”Future”(代表未来某个时刻的结果),调用方可以在需要结果时阻塞等待,或继续做其他事情后再获取结果。在 Go 中,通常通过 channel 来实现这一模式。
Go 并没有内置的 Future/Promise 类型,但 channel 天然适合实现这个模式。一个带一个缓冲区的 channel 就是一个最简单的 Promise:goroutine 写入结果,调用方读取结果。
package main
import (
"fmt"
"math/rand"
"time"
)
// Future 是一个只读 channel,代表未来的结果
type Future[T any] <-chan T
// Promise 是一个可写的 channel,用于写入结果
type Promise[T any] chan T
// NewPromise 创建一个 Promise
func NewPromise[T any]() Promise[T] {
return make(chan T, 1) // 缓冲为 1,写入不会阻塞
}
// Future 获取 Future 视图
func (p Promise[T]) Future() Future[T] {
return p
}
// Resolve 写入结果
func (p Promise[T]) Resolve(value T) {
p <- value
close(p)
}
// AsyncCompute 异步执行计算,返回 Future
func AsyncCompute(a, b int) Future[int] {
p := NewPromise[int]()
go func() {
// 模拟耗时计算
time.Sleep(time.Duration(rand.Intn(500)+100) * time.Millisecond)
result := a + b
p.Resolve(result)
}()
return p.Future()
}
func main() {
// 发起多个异步计算
f1 := AsyncCompute(10, 20)
f2 := AsyncCompute(30, 40)
f3 := AsyncCompute(50, 60)
fmt.Println("异步计算已发起,继续做其他事情...")
// 需要结果时,从 Future 中读取(会阻塞直到结果就绪)
fmt.Printf("结果1: %d\n", <-f1)
fmt.Printf("结果2: %d\n", <-f2)
fmt.Printf("结果3: %d\n", <-f3)
}Future/Promise 模式要点
- 带缓冲的 channel:缓冲区大小为 1,确保写入方不会阻塞
- 泛型支持:Go 1.18+ 的泛型让 Future/Promise 可以用于任意类型
- 组合使用:Future 可以与
select组合实现超时、取消等高级功能 - 适用场景:异步 HTTP 请求、并发数据库查询、任务编排
Timeout 与 Cancellation 模式
通过 select 语句配合 time.After(或 time.NewTimer)和 context.Context,实现对长时间运行操作的超时控制和主动取消,防止 goroutine 无限期阻塞。
goroutine 泄漏是 Go 程序中最常见的资源泄漏问题之一。如果 goroutine 因等待 channel、网络 IO 等操作而永久阻塞,且没有任何机制取消它,就会导致 goroutine 泄漏。Timeout 与 Cancellation 模式正是解决这一问题的关键手段。
基于 select + time.After 的超时控制
package main
import (
"fmt"
"time"
)
func slowOperation() string {
time.Sleep(2 * time.Second) // 模拟耗时操作
return "操作完成"
}
func main() {
ch := make(chan string)
go func() {
ch <- slowOperation()
}()
select {
case result := <-ch:
fmt.Println(result)
case <-time.After(1 * time.Second):
fmt.Println("操作超时!")
}
// 注意:此示例中 slowOperation 的 goroutine 仍然在运行
// 生产环境中应使用 context.Context 来实现真正的取消
}基于 context.Context 的优雅取消
package main
import (
"context"
"fmt"
"time"
)
func slowOperationWithContext(ctx context.Context) (string, error) {
resultCh := make(chan string, 1)
errorCh := make(chan error, 1)
go func() {
// 模拟耗时操作,同时检查 context 是否已取消
select {
case <-time.After(3 * time.Second):
resultCh <- "操作完成"
case <-ctx.Done():
errorCh <- ctx.Err()
}
}()
select {
case result := <-resultCh:
return result, nil
case err := <-errorCh:
return "", err
}
}
func main() {
// 带超时的 context
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err := slowOperationWithContext(ctx)
if err != nil {
fmt.Printf("操作失败: %v\n", err) // 输出: 操作失败: context deadline exceeded
return
}
fmt.Println(result)
}Timeout 与 Cancellation 模式要点
time.After的隐患:每次调用都会创建新的Timer,在循环中使用可能导致内存泄漏,推荐使用time.NewTimer并手动Stop()context.Context是标准方案:Go 社区的推荐做法,支持超时(WithTimeout)、手动取消(WithCancel)、值传递(WithValue)和截止时间(WithDeadline)- 传播取消信号:将
ctx作为第一个参数传递给所有函数,形成取消信号的传播链 - 适用场景:HTTP 请求超时、数据库查询取消、分布式任务协调
Rate Limiting(限流模式)
限流模式控制单位时间内的操作频率,防止系统因请求过多而过载。常见的限流算法有令牌桶(Token Bucket)、**滑动窗口(Sliding Window)和漏桶(Leaky Bucket)**等。
基于 time.Ticker 的简单限流
package main
import (
"fmt"
"time"
)
func main() {
// 每秒最多处理 3 个请求
limiter := time.NewTicker(time.Second / 3)
defer limiter.Stop()
requests := []string{"A", "B", "C", "D", "E", "F", "G", "H"}
for _, req := range requests {
<-limiter.C // 等待令牌
fmt.Printf("[%s] 处理请求 %s\n", time.Now().Format("15:04:05.000"), req)
}
}基于令牌桶的限流(使用 golang.org/x/time/rate)
package main
import (
"context"
"fmt"
"golang.org/x/time/rate"
"time"
)
func main() {
// 每秒产生 2 个令牌,桶容量为 5(允许突发 5 个请求)
limiter := rate.NewLimiter(2, 5)
requests := []string{"A", "B", "C", "D", "E", "F", "G", "H"}
for _, req := range requests {
// Wait 阻塞等待直到获取到令牌
err := limiter.Wait(context.Background())
if err != nil {
fmt.Printf("请求 %s 被拒绝: %v\n", req, err)
continue
}
fmt.Printf("[%s] 处理请求 %s\n", time.Now().Format("15:04:05.000"), req)
}
// 使用 Allow 检查但不阻塞
if limiter.Allow() {
fmt.Println("请求被允许")
} else {
fmt.Println("请求被限流")
}
}限流模式要点
- 令牌桶算法:以固定速率向桶中放入令牌,请求需要获取令牌才能执行。桶有最大容量,允许一定程度的突发流量
golang.org/x/time/rate:Go 官方扩展库提供的限流器,支持令牌桶算法,生产环境推荐使用Wait()vsAllow():Wait()阻塞等待令牌,Allow()非阻塞地检查是否可以获得令牌- 适用场景:API 网关限流、数据库连接池控制、消息队列消费速率限制
生产者-消费者模型
生产者-消费者模型通过一个共享的缓冲队列(在 Go 中通常是 buffered channel)将数据的产生和消费解耦。生产者向 channel 写入数据,消费者从 channel 读取数据。两者可以独立地以不同的速率运行。
生产者-消费者模型是并发编程中最基础、最常用的模式之一。Go 的 channel 本身就是一个线程安全的队列,天然适合实现这一模式,无需额外加锁。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Task 表示要处理的任务
type Task struct {
ID int
Data string
}
// Producer 生产者:生成任务并发送到 channel
func producer(id int, tasks chan<- Task, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
task := Task{
ID: id*100 + i,
Data: fmt.Sprintf("Producer-%d 的第 %d 个任务", id, i+1),
}
tasks <- task
fmt.Printf("[Producer-%d] 生产任务: %d\n", id, task.ID)
time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
}
}
// Consumer 消费者:从 channel 读取任务并处理
func consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("[Consumer-%d] 消费任务: %d, 数据: %s\n", id, task.ID, task.Data)
time.Sleep(time.Duration(rand.Intn(500)+200) * time.Millisecond) // 模拟处理耗时
}
}
func main() {
const bufferSize = 10
tasks := make(chan Task, bufferSize) // 带缓冲的 channel 充当队列
var wg sync.WaitGroup
// 启动 2 个生产者
for i := 1; i <= 2; i++ {
wg.Add(1)
go producer(i, tasks, &wg)
}
// 启动 3 个消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(i, tasks, &wg)
}
// 等待所有生产者完成
// 注意:这里需要分离生产者和消费者的 WaitGroup
// 上面的简化示例中,消费者依赖 tasks channel 的关闭来退出
// 实际项目推荐使用两个独立的 WaitGroup
go func() {
wg.Wait()
close(tasks)
}()
// 等待消费者处理完所有任务
// (此处简化处理,实际需要单独追踪消费者的完成状态)
time.Sleep(3 * time.Second)
fmt.Println("程序结束")
}更健壮的生产者-消费者实现
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Task struct {
ID int
Data string
}
func main() {
const bufferSize = 10
tasks := make(chan Task, bufferSize)
var producerWg, consumerWg sync.WaitGroup
// 启动 2 个生产者
for i := 1; i <= 2; i++ {
producerWg.Add(1)
go func(id int) {
defer producerWg.Done()
for j := 0; j < 5; j++ {
tasks <- Task{ID: id*100 + j, Data: fmt.Sprintf("Task-%d", id*100+j)}
time.Sleep(time.Duration(rand.Intn(200)+50) * time.Millisecond)
}
}(i)
}
// 生产者全部完成后关闭 channel
go func() {
producerWg.Wait()
close(tasks)
}()
// 启动 3 个消费者
for i := 1; i <= 3; i++ {
consumerWg.Add(1)
go func(id int) {
defer consumerWg.Done()
for task := range tasks {
fmt.Printf("[Consumer-%d] 处理: %+v\n", id, task)
time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
}
}(i)
}
// 等待所有消费者完成
consumerWg.Wait()
fmt.Println("所有任务处理完毕")
}生产者-消费者模型要点
- 缓冲 channel 解耦:缓冲区大小决定了生产者和消费者之间的解耦程度——缓冲区越大,生产者越不容易被阻塞
- 关闭 channel 的时机:只应由生产者关闭 channel,且必须确保所有生产者都已完成
- 独立 WaitGroup:生产者和消费者应使用各自独立的
WaitGroup,避免混淆 - 适用场景:日志收集系统、消息队列、任务调度系统、事件驱动架构
练习题
练习 1:实现一个带限流的工作池
结合 Worker Pool 和 Rate Limiting 模式,实现一个工作池:启动 5 个 worker,同时限制任务启动速率不超过每秒 3 个,总共处理 20 个任务。
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d 处理任务 %d\n", id, job)
time.Sleep(300 * time.Millisecond)
results <- job * 2
}
}
func main() {
const numJobs = 20
const numWorkers = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 限流器:每秒 3 个任务
limiter := rate.NewLimiter(3, 3)
// 启动 workers
var workerWg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
workerWg.Add(1)
go worker(w, jobs, results, &workerWg)
}
go func() {
workerWg.Wait()
close(results)
}()
// 以限流速率发送任务
go func() {
for j := 1; j <= numJobs; j++ {
limiter.Wait(context.Background())
jobs <- j
fmt.Printf("发送任务 %d\n", j)
}
close(jobs)
}()
for r := range results {
fmt.Printf("收到结果: %d\n", r)
}
fmt.Println("全部完成")
}这个实现将 Worker Pool 和 Rate Limiting 结合:limiter.Wait() 确保任务发送速率不超过每秒 3 个,而 5 个 worker 可以并行处理这些任务。
练习 2:实现带超时的 Pipeline
构建一个三阶段 Pipeline:生成器产生 1~20 的整数 → 计算阶段对每个数执行耗时 200ms 的平方运算 → 消费阶段打印结果。要求使用 context.WithTimeout 设置总超时时间为 2 秒,超时后优雅停止所有阶段。
package main
import (
"context"
"fmt"
"time"
)
func generator(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
fmt.Println("生成器:收到取消信号")
return
}
}
}()
return out
}
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-ctx.Done():
fmt.Println("计算阶段:收到取消信号")
return
case <-time.After(200 * time.Millisecond):
select {
case out <- n * n:
case <-ctx.Done():
fmt.Println("计算阶段:发送时收到取消信号")
return
}
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
nums := make([]int, 20)
for i := range nums {
nums[i] = i + 1
}
stage1 := generator(ctx, nums...)
stage2 := square(ctx, stage1)
count := 0
for result := range stage2 {
fmt.Printf("结果: %d\n", result)
count++
}
fmt.Printf("共处理 %d 个结果\n", count)
}关键点:每个阶段都通过 select 监听 ctx.Done(),超时后逐级传播取消信号,实现优雅关闭。
练习 3:实现 Future 的错误处理
扩展示例中的 AsyncCompute Future/Promise 模式,使其能够处理错误情况。定义一个 Result[T] 类型,同时包含结果值和错误信息。实现 AsyncDivide(a, b float64) 异步除法函数,当除数为 0 时返回错误。
package main
import (
"fmt"
"math/rand"
"time"
)
// Result 包含值和错误
type Result[T any] struct {
Value T
Err error
}
type Future[T any] <-chan Result[T]
type Promise[T any] chan Result[T]
func NewPromise[T any]() Promise[T] {
return make(chan Result[T], 1)
}
func (p Promise[T]) Future() Future[T] {
return p
}
func (p Promise[T]) Resolve(value T, err error) {
p <- Result[T]{Value: value, Err: err}
close(p)
}
// 异步除法
func AsyncDivide(a, b float64) Future[float64] {
p := NewPromise[float64]()
go func() {
time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
if b == 0 {
p.Resolve(0, fmt.Errorf("除数不能为零"))
return
}
p.Resolve(a/b, nil)
}()
return p.Future()
}
func main() {
divisions := []struct{ a, b float64 }{
{10, 3}, {20, 4}, {5, 0}, {100, 10}, {8, 0},
}
var futures []Future[float64]
for _, d := range divisions {
futures = append(futures, AsyncDivide(d.a, d.b))
}
for i, f := range futures {
result := <-f
if result.Err != nil {
fmt.Printf("除法 %d 失败: %v\n", i+1, result.Err)
} else {
fmt.Printf("除法 %d 结果: %.2f\n", i+1, result.Value)
}
}
}通过泛型 Result[T] 结构体统一封装值和错误,使 Future 模式能正确处理异常情况,符合 Go 的错误处理惯例。
Go 的并发原语虽然简单(goroutine + channel),但通过组合它们可以实现强大而灵活的并发模式。掌握这些模式不仅能让代码更健壮、更高效,还能帮助你更好地理解并发编程的核心思想。
