sync 包
sync 包提供了 Go 语言中基本的并发同步原语。当多个 Goroutine 需要共享资源或协调执行顺序时,sync 包中的工具是必不可少的。它包含 WaitGroup(等待组)、Mutex(互斥锁)、RWMutex(读写锁)、Once(单次执行)、Map(并发安全映射)、Pool(对象池)等核心组件。
sync.WaitGroup
sync.WaitGroup 用于等待一组 Goroutine 完成。它内部维护一个计数器,通过 Add(n) 增加计数、Done() 减少 1、Wait() 阻塞直到计数器归零。常用于”主 Goroutine 等待多个工作 Goroutine 完成”的场景。
基本用法
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 等价于 wg.Add(-1)
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Duration(id) * 100 * time.Millisecond)
fmt.Printf("Worker %d 完成\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // 每启动一个 worker 增加计数
go worker(i, &wg)
}
wg.Wait() // 阻塞直到所有 worker 完成
fmt.Println("所有 Worker 完成")
}WaitGroup 使用注意事项
Add()必须在Wait()之前调用,且在启动 Goroutine 之前调用wg应该通过指针传递(*sync.WaitGroup),否则每个 Goroutine 操作的是副本Add()的值可以为负数,但不应该使计数器变为负数(会导致 panic)- 传递
wg时要在go语句之前调用Add(),避免竞态 WaitGroup不是可重入的,不能重复使用同一实例
WaitGroup 的常见错误
// ❌ 错误 1:在 Goroutine 内部调用 Add(竞态)
for i := 0; i < 5; i++ {
go func() {
wg.Add(1) // 可能 Wait 已经返回了
defer wg.Done()
}()
}
wg.Wait()
// ✅ 正确:在启动 Goroutine 之前调用 Add
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
}()
}
wg.Wait()sync.Mutex
sync.Mutex 提供了独占式的锁机制。同一时刻只有一个 Goroutine 可以持有锁,其他尝试获取锁的 Goroutine 会被阻塞,直到锁被释放。Mutex 用于保护共享资源,防止数据竞争(Data Race)。
基本用法
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock() // 获取锁
defer c.mu.Unlock() // 确保释放锁
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
var counter Counter
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("最终值:", counter.Value()) // 1000
}Mutex 的常见陷阱
- 忘记解锁:导致死锁。始终使用
defer mu.Unlock() - 锁的粒度太大:过度加锁会降低并发性能
- 复制 Mutex:
Mutex不能被复制(包含noCopy标记),复制后会导致锁失效 - 未导出字段使用锁:将 Mutex 和被保护的数据放在同一个结构体中,且 Mutex 为未导出字段
锁粒度的控制
type SafeMap struct {
mu sync.Mutex
m map[string]int
}
// ✅ 好的做法:锁粒度尽量小
func (sm *SafeMap) Get(key string) (int, bool) {
sm.mu.Lock()
defer sm.mu.Unlock()
val, ok := sm.m[key]
return val, ok
}
func (sm *SafeMap) Set(key string, value int) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.m[key] = value
}
// ❌ 不好的做法:锁粒度太大
func (sm *SafeMap) BatchSet(items map[string]int) {
sm.mu.Lock()
defer sm.mu.Unlock()
// 执行大量耗时操作...
for k, v := range items {
sm.m[k] = v
}
}sync.RWMutex
sync.RWMutex 是一种更高效的锁,它区分读操作和写操作。多个 Goroutine 可以同时持有读锁(RLock),但写锁(Lock)是独占的。适用于读多写少的场景,能显著提升并发读性能。
基本用法
package main
import (
"fmt"
"sync"
"time"
)
type ThreadSafeCache struct {
mu sync.RWMutex
data map[string]string
}
func NewThreadSafeCache() *ThreadSafeCache {
return &ThreadSafeCache{
data: make(map[string]string),
}
}
// 读操作:使用 RLock(允许多个读者同时读取)
func (c *ThreadSafeCache) Get(key string) (string, bool) {
c.mu.RLock() // 读锁
defer c.mu.RUnlock() // 释放读锁
val, ok := c.data[key]
return val, ok
}
// 写操作:使用 Lock(独占)
func (c *ThreadSafeCache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
cache := NewThreadSafeCache()
var wg sync.WaitGroup
// 启动多个读操作(可以同时进行)
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
val, ok := cache.Get("key")
if ok {
fmt.Printf("读者 %d: %s\n", id, val)
} else {
fmt.Printf("读者 %d: key 不存在\n", id)
}
}(i)
}
// 启动写操作(独占)
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(50 * time.Millisecond)
cache.Set("key", "hello")
fmt.Println("写入者: 设置 key = hello")
}()
wg.Wait()
}Mutex vs RWMutex
| 特性 | Mutex | RWMutex |
|---|---|---|
| 读-读 | 互斥(串行) | 并行(同时) |
| 读-写 | 互斥 | 互斥 |
| 写-写 | 互斥 | 互斥 |
| 性能 | 简单场景更快 | 读多写少场景更快 |
| 开销 | 更低 | 略高(内部维护读者计数) |
| 适用场景 | 读写频率相近 | 读多写少 |
何时使用 RWMutex?
当读操作远多于写操作(比如读:写 > 10:1),且读操作耗时较长时,使用 RWMutex 能显著提升性能。如果读写频率相近或写操作很多,Mutex 可能更简单高效。
sync.Once
sync.Once 保证一个函数只会被执行一次,无论被多少个 Goroutine 调用。它常用于单例模式、延迟初始化等场景。Once 是并发安全的,内部通过原子操作和互斥锁实现。
基本用法
package main
import (
"fmt"
"sync"
)
var (
instance *Config
once sync.Once
)
type Config struct {
DatabaseURL string
Port int
}
func GetConfig() *Config {
once.Do(func() {
fmt.Println("初始化配置...(只执行一次)")
instance = &Config{
DatabaseURL: "postgres://localhost:5432/mydb",
Port: 8080,
}
})
return instance
}
func main() {
var wg sync.WaitGroup
// 10 个 Goroutine 同时尝试获取配置
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
cfg := GetConfig()
fmt.Printf("Goroutine %d: port=%d\n", id, cfg.Port)
}(i)
}
wg.Wait()
}
// 输出:
// 初始化配置...(只执行一次)
// Goroutine 3: port=8080
// Goroutine 7: port=8080
// ...(所有 Goroutine 共享同一个 instance)sync.Once 的注意事项
once.Do(f)中的函数f如果 panic,后续调用once.Do仍会重新执行sync.Once不能被复制- 如果初始化函数需要返回值,应使用闭包捕获外部变量
sync.Once只有零值可用,不能new(sync.Once)后赋值给其他变量
实现单例模式
package singleton
import "sync"
type Database struct {
// 数据库连接配置
}
var (
db *Database
once sync.Once
)
func GetDatabase() *Database {
once.Do(func() {
db = &Database{}
// 初始化连接...
})
return db
}sync.Map
sync.Map 是 Go 内置的并发安全映射,专为两种场景优化:
- 键值对只写一次但多次读取(如缓存)
- 多个 Goroutine 读写不同的键(不重叠的键集合)
在大多数场景下,使用 map + sync.RWMutex 性能更好。sync.Map 只有在上述特定场景下才比手动加锁更高效。
基本用法
package main
import (
"fmt"
"sync"
)
func main() {
var m sync.Map
// Store: 存储键值对
m.Store("name", "Alice")
m.Store("age", 30)
m.Store(42, "answer")
// Load: 读取键值对
if name, ok := m.Load("name"); ok {
fmt.Println("name:", name) // Alice
}
// LoadOrStore: 如果 key 存在则返回现有值,否则存储并返回新值
actual, loaded := m.LoadOrStore("name", "Bob")
fmt.Printf("actual=%v, loaded=%v\n", actual, loaded) // actual=Alice, loaded=true
// LoadAndDelete: 获取并删除
if age, ok := m.LoadAndDelete("age"); ok {
fmt.Println("删除 age:", age) // 30
}
// Delete: 删除指定键
m.Delete(42)
// Range: 遍历所有键值对
m.Store("city", "Beijing")
m.Store("country", "China")
m.Range(func(key, value any) bool {
fmt.Printf(" %v: %v\n", key, value)
return true // 返回 false 停止遍历
})
// 输出:
// name: Alice
// city: Beijing
// country: China
}sync.Map vs map + RWMutex
sync.Map适用于键稳定(写一次读多次)或键不重叠的场景- 对于频繁增删改的场景,
map+RWMutex通常性能更好 sync.Map的Range操作会获取全局锁,遍历效率不如普通mapsync.Map是类型不安全的,键和值都是any类型- 大多数情况下,推荐使用
map+Mutex/RWMutex
sync.Pool
sync.Pool 是一个临时对象池,用于缓存和复用临时对象,减少内存分配和 GC 压力。Pool 中的对象是临时的——GC 时可能被自动清理。它适用于频繁创建和销毁的短生命周期对象,如 []byte 缓冲区。
基本用法
package main
import (
"fmt"
"sync"
)
var bufPool = sync.Pool{
New: func() any {
// 当池中没有可用对象时调用
fmt.Println("创建新的缓冲区")
return make([]byte, 0, 1024) // 初始容量 1024
},
}
func process(data string) {
// 从池中获取缓冲区
buf := bufPool.Get().([]byte)
defer bufPool.Put(buf) // 用完归还到池中
// 使用缓冲区
buf = append(buf[:0], data...) // 重置后使用
fmt.Printf("处理数据: %s, 缓冲区容量: %d\n", string(buf), cap(buf))
}
func main() {
for i := 0; i < 5; i++ {
process(fmt.Sprintf("任务-%d", i))
}
}
// 输出:
// 创建新的缓冲区 ← 第一次从池中获取,需要新建
// 处理数据: 任务-0, 缓冲区容量: 1024
// 处理数据: 任务-1, 缓冲区容量: 1024 ← 从池中复用,不再创建
// 处理数据: 任务-2, 缓冲区容量: 1024
// 处理数据: 任务-3, 缓冲区容量: 1024
// 处理数据: 任务-4, 缓冲区容量: 1024sync.Pool 的注意事项
- GC 会清理 Pool:Pool 中的对象在 GC 时可能被回收,不能保证对象永远存在
- 归还前必须重置:
Put前应清空对象状态,避免数据泄漏 - 不适合做连接池:Pool 不适合保存数据库连接等长生命周期资源
- 不要假设 Get 返回的对象状态:Get 可能返回新对象或被其他 Goroutine 使用过的对象
- 适合高频率小对象的场景:如 JSON 编码的缓冲区、HTTP 请求的临时数据
实际应用:JSON 编码缓冲区
var jsonBufPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
func MarshalToBytes(v any) ([]byte, error) {
buf := jsonBufPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset() // 重置缓冲区
jsonBufPool.Put(buf) // 归还到池中
}()
buf.Reset()
if err := json.NewEncoder(buf).Encode(v); err != nil {
return nil, err
}
return bytes.TrimSpace(buf.Bytes()), nil
}sync/atomic 原子操作
原子操作是不可被中断的单个操作,要么完全执行成功,要么完全不执行。sync/atomic 包提供了对整数、指针等类型进行原子操作的方法,性能优于使用锁。适合简单的计数器、标志位等场景。
基本操作
package main
import (
"fmt"
"sync/atomic"
)
func main() {
var counter int64 = 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1) // 原子递增
}()
}
wg.Wait()
fmt.Println("原子计数器:", counter) // 1000
// 原子加载
val := atomic.LoadInt64(&counter)
// 原子存储
atomic.StoreInt64(&counter, 42)
// 原子交换(返回旧值)
old := atomic.SwapInt64(&counter, 100)
fmt.Println("交换前:", old) // 42
// 原子比较并交换(CAS)
swapped := atomic.CompareAndSwapInt64(&counter, 100, 200)
fmt.Println("CAS 成功:", swapped) // true
fmt.Println("当前值:", counter) // 200
}atomic.Value
atomic.Value 提供了原子加载和存储任意类型的值。它要求存储的类型必须一致,否则会 panic。常用于配置更新等场景。
package main
import (
"fmt"
"sync/atomic"
)
type AppConfig struct {
MaxConn int
Timeout int
}
func main() {
var config atomic.Value
// 初始配置
config.Store(AppConfig{MaxConn: 100, Timeout: 30})
// 原子读取
cfg := config.Load().(AppConfig)
fmt.Printf("配置: MaxConn=%d, Timeout=%d\n", cfg.MaxConn, cfg.Timeout)
// 原子更新(另一个 Goroutine 中)
go func() {
config.Store(AppConfig{MaxConn: 200, Timeout: 60})
}()
// 读取最新配置
newCfg := config.Load().(AppConfig)
fmt.Printf("新配置: MaxConn=%d, Timeout=%d\n", newCfg.MaxConn, newCfg.Timeout)
}atomic.Value 的类型约束
atomic.Value 存储的值必须是同一类型。第一次 Store 确定了类型,后续 Store 的类型必须相同,否则 panic:
var v atomic.Value
v.Store(42) // 确定类型为 int
v.Store("hello") // panic: inconsistent typesGo 1.19+ 的原子类型
Go 1.19 引入了新的类型化原子操作,替代了旧的函数式 API:
package main
import (
"fmt"
"sync/atomic"
)
func main() {
// Go 1.19+ 新 API
var counter atomic.Int64
counter.Add(1)
counter.Add(1)
fmt.Println(counter.Load()) // 2
counter.Store(100)
fmt.Println(counter.Swap(200)) // 100
fmt.Println(counter.Load()) // 200
// atomic.Bool
var flag atomic.Bool
flag.Store(true)
fmt.Println(flag.Load()) // true
flag.Toggle()
fmt.Println(flag.Load()) // false
// atomic.Pointer(Go 1.19+)
var ptr atomic.Pointer[string]
str := "hello"
ptr.Store(&str)
fmt.Println(*ptr.Load()) // hello
}竞态检测器:go run -race
竞态条件是指多个 Goroutine 同时访问共享数据,且至少一个访问是写操作,导致程序的输出依赖于 Goroutine 的执行顺序。竞态条件是并发编程中最常见也最难调试的 bug 之一。
使用 race 检测器
Go 内置了竞态检测器,通过 -race 标志启用:
go run -race main.go
go test -race ./...
go build -race -o myapp检测竞态的示例
// race_example.go
package main
import (
"fmt"
"sync"
)
func main() {
var counter int
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // ❌ 数据竞争!多个 goroutine 同时写入
}()
}
wg.Wait()
fmt.Println("counter =", counter)
}$ go run -race race_example.go
==================
WARNING: DATA RACE
Write at 0x00c0000b0008 by goroutine 7:
main.main.func1()
/path/race_example.go:14 +0x3e
...
Previous write at 0x00c0000b0008 by goroutine 6:
main.main.func1()
/path/race_example.go:14 +0x3e
...
==================
Found 1 data race(s)修复竞态
// ✅ 方法 1:使用 Mutex
var mu sync.Mutex
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}
// ✅ 方法 2:使用 atomic
func increment() {
atomic.AddInt64(&counter, 1)
}
// ✅ 方法 3:使用 Channel(CSP 模型)
func increment(ch chan int) {
ch <- 1
}竞态检测器的使用建议
- CI/CD 中使用:在测试命令中加入
-race标志 - 性能开销:race 检测器会使程序慢 5-10 倍,内存开销增加 5-10 倍
- 仅用于开发和测试:不要在生产环境使用
-race编译 - 不能保证检测到所有竞态:race 检测器是动态分析工具,只能检测实际执行路径上的竞态
- 结合代码审查:race 检测器是辅助工具,不能替代良好的并发设计
练习题
练习 1:并发安全的计数器
实现一个 SafeCounter 结构体,要求:
- 使用
sync/atomic包实现Increment()、Decrement()、Value()方法 - 支持
Reset()将计数器归零 - 编写测试验证 1000 个 Goroutine 并发递增的正确性
代码:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type SafeCounter struct {
value int64
}
func (c *SafeCounter) Increment() {
atomic.AddInt64(&c.value, 1)
}
func (c *SafeCounter) Decrement() {
atomic.AddInt64(&c.value, -1)
}
func (c *SafeCounter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
func (c *SafeCounter) Reset() {
atomic.StoreInt64(&c.value, 0)
}
func main() {
var counter SafeCounter
var wg sync.WaitGroup
// 1000 个 Goroutine 并发递增
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("递增后:", counter.Value()) // 1000
// 500 个 Goroutine 并发递减
for i := 0; i < 500; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Decrement()
}()
}
wg.Wait()
fmt.Println("递减后:", counter.Value()) // 500
counter.Reset()
fmt.Println("重置后:", counter.Value()) // 0
// 验证:使用 race 检测器运行
// go run -race main.go
}验证:
$ go run -race main.go
递增后: 1000
递减后: 500
重置后: 0无 race warning 说明实现是并发安全的。
练习 2:sync.Pool 缓冲区复用
使用 sync.Pool 实现一个高效的字符串拼接器。要求:
- 使用
bytes.Buffer作为池化对象 - 实现
Join(parts []string) string方法,从池中获取 buffer、拼接字符串、归还 buffer - 对比使用 Pool 和不使用 Pool 的性能差异
代码:
package main
import (
"bytes"
"fmt"
"sync"
)
var bufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
func Join(parts ...string) string {
buf := bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufferPool.Put(buf)
}()
buf.Reset()
for i, part := range parts {
if i > 0 {
buf.WriteByte(',')
}
buf.WriteString(part)
}
return buf.String()
}
func JoinWithoutPool(parts ...string) string {
buf := new(bytes.Buffer)
for i, part := range parts {
if i > 0 {
buf.WriteByte(',')
}
buf.WriteString(part)
}
return buf.String()
}
func main() {
parts := make([]string, 1000)
for i := range parts {
parts[i] = fmt.Sprintf("item-%d", i)
}
// 使用 Pool
fmt.Println("使用 Pool:")
result := Join(parts...)
fmt.Println("长度:", len(result))
// 不使用 Pool
fmt.Println("不使用 Pool:")
result2 := JoinWithoutPool(parts...)
fmt.Println("长度:", len(result2))
// 性能对比
iterations := 10000
fmt.Printf("\n性能测试(%d 次迭代):\n", iterations)
start := time.Now()
for i := 0; i < iterations; i++ {
Join(parts...)
}
fmt.Printf("使用 Pool: %v\n", time.Since(start))
start = time.Now()
for i := 0; i < iterations; i++ {
JoinWithoutPool(parts...)
}
fmt.Printf("不使用 Pool: %v\n", time.Since(start))
}典型输出:
使用 Pool:
长度: 8988
不使用 Pool:
长度: 8988
性能测试(10000 次迭代):
使用 Pool: 8.5ms
不使用 Pool: 15.2ms分析:使用 sync.Pool 减少了内存分配次数和 GC 压力,在高频调用场景下性能提升明显。
练习 3:RWMutex 读多写少场景
实现一个 ThreadSafeList,使用 sync.RWMutex 保护一个切片。要求实现 Append(val int)、Get(index int) (int, error)、Len() int、Snapshot() []int 方法。对比使用 Mutex 和 RWMutex 在”100 个并发读 + 1 个并发写”场景下的性能差异。
代码:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type ThreadSafeList struct {
mu sync.RWMutex
data []int
}
func NewThreadSafeList() *ThreadSafeList {
return &ThreadSafeList{data: make([]int, 0)}
}
func (l *ThreadSafeList) Append(val int) {
l.mu.Lock()
defer l.mu.Unlock()
l.data = append(l.data, val)
}
func (l *ThreadSafeList) Get(index int) (int, error) {
l.mu.RLock()
defer l.mu.RUnlock()
if index < 0 || index >= len(l.data) {
return 0, fmt.Errorf("索引越界: %d", index)
}
return l.data[index], nil
}
func (l *ThreadSafeList) Len() int {
l.mu.RLock()
defer l.mu.RUnlock()
return len(l.data)
}
func (l *ThreadSafeList) Snapshot() []int {
l.mu.RLock()
defer l.mu.RUnlock()
snapshot := make([]int, len(l.data))
copy(snapshot, l.data)
return snapshot
}
func benchmarkRW(numReaders, iterations int) {
list := NewThreadSafeList()
var wg sync.WaitGroup
var writeCounter atomic.Int64
// 预填充一些数据
for i := 0; i < 100; i++ {
list.Append(i)
}
start := time.Now()
// 启动写操作
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
list.Append(i)
writeCounter.Add(1)
}
}()
// 启动读操作
for r := 0; r < numReaders; r++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for i := 0; i < iterations; i++ {
list.Get(i % list.Len())
}
}(r)
}
wg.Wait()
fmt.Printf("RWMutex - 读者数: %d, 迭代: %d, 写入: %d, 耗时: %v\n",
numReaders, iterations, writeCounter.Load(), time.Since(start))
}
func main() {
fmt.Println("RWMutex 读多写少性能测试")
fmt.Println("============================")
// 100 个并发读 + 1 个并发写
benchmarkRW(100, 10000)
// 10 个并发读 + 1 个并发写
benchmarkRW(10, 10000)
}典型输出:
RWMutex 读多写少性能测试
============================
RWMutex - 读者数: 100, 迭代: 10000, 写入: 10000, 耗时: 12ms
RWMutex - 读者数: 10, 迭代: 10000, 写入: 10000, 耗时: 8ms关键点:RWMutex 允许读操作并行执行,在读者数量增多时,相比 Mutex 的串行读,性能优势更明显。
