Sync 包与并发原语
Go 的 sync 包提供了多种并发原语,用于协调 goroutine 之间的同步和共享数据的访问。
Mutex(互斥锁)
基本使用
var mu sync.Mutex
var count int
func increment() {
mu.Lock()
defer mu.Unlock()
count++
}Mutex 底层原理
type Mutex struct {
state int32 // 状态位
sema uint32 // 信号量
}
// state 三个位:
// - 第 0 位:locked(是否锁定)
// - 第 1 位:woken(是否有唤醒的 goroutine)
// - 第 2-31 位:waiter(等待的 goroutine 数量)锁的两种模式:
正常模式(Normal Mode):
- 等待者按 FIFO 顺序唤醒
- 被唤醒的 goroutine 与新来的 goroutine 竞争
- 新来的更容易成功(还在 CPU 上运行)
- 可能导致”饥饿”,但有更好的吞吐量
饥饿模式(Starvation Mode):
- 锁直接交给等待队列队头
- 新来的 goroutine 不能抢锁
- 当等待时间超过 1ms 时触发
- 当等待者只剩 1 个时退出
面试重点:为什么 Mutex 不能复制?
type Counter struct {
mu sync.Mutex
count int
}
// ❌ 错误:Mutex 被复制
func (c Counter) Increment() { // 值接收者,复制了 Mutex
c.mu.Lock()
c.count++
c.mu.Unlock()
}
// ✅ 正确:使用指针接收者
func (c *Counter) Increment() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}Mutex 包含状态字段,复制后状态不一致,可能导致死锁或数据竞争。
RWMutex(读写锁)
var rw sync.RWMutex
var data map[string]string
func read(key string) string {
rw.RLock()
defer rw.RUnlock()
return data[key]
}
func write(key, value string) {
rw.Lock()
defer rw.Unlock()
data[key] = value
}RWMutex 特性:
- 读锁:多个 goroutine 可以同时持有
- 写锁:独占,阻塞所有读写锁
- 读锁优先:写锁可能饥饿
- 适用场景:读多写少
注意事项:
// ❌ 错误:RWMutex 嵌套
func nested() {
rw.RLock()
// rw.RLock() // 可重入,但 Go 不支持
rw.Lock() // 死锁!
rw.Unlock()
rw.RUnlock()
}
// ❌ 错误:Lock 中嵌套 RLock
func nested2() {
rw.Lock()
rw.RLock() // 死锁!
rw.RUnlock()
rw.Unlock()
}WaitGroup
基本使用
var wg sync.WaitGroup
func worker(id int) {
defer wg.Done()
fmt.Printf("worker %d\n", id)
}
func main() {
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i)
}
wg.Wait() // 等待所有 worker 完成
}WaitGroup 底层原理
type WaitGroup struct {
noCopy noCopy // 禁止复制
state atomic.Uint64 // 高 32 位:counter,低 32 位:waiter
sema uint32 // 信号量
}操作流程:
- Add(delta):counter += delta
- Done():Add(-1),即 counter -= 1
- Wait():
- counter > 0:waiter++,阻塞
- counter == 0:唤醒所有 waiter
面试重点:Add 时机
// ✅ 正确:在 goroutine 外 Add
wg.Add(1)
go func() {
defer wg.Done()
// ...
}()
// ❌ 错误:在 goroutine 内 Add
go func() {
wg.Add(1) // 可能已经 Wait 了
defer wg.Done()
// ...
}()
wg.Wait() // 可能 Add 还没执行Once
基本使用
var once sync.Once
var instance *Singleton
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{}
})
return instance
}Once 底层原理
type Once struct {
done atomic.Uint32 // 0: 未执行, 1: 已执行
m Mutex
}
func (o *Once) Do(f func()) {
// 快速路径:原子检查
if o.done.Load() == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done.Load() == 0 {
defer o.done.Store(1)
f()
}
}面试重点:Once 保证什么?
- 保证函数只执行一次
- 多个 goroutine 同时调用,只有一个执行
- 执行时阻塞其他调用
- 执行完成后,其他调用直接返回
常见问题:
// ❌ 错误:Do 中 panic
var once sync.Once
once.Do(func() {
panic("error")
})
// 再次调用 Once.Do 不会再执行 f
// ✅ 解决:使用 recover
once.Do(func() {
defer func() {
if r := recover(); r != nil {
log.Printf("panic: %v", r)
}
}()
// ...
})Cond(条件变量)
var (
mu sync.Mutex
cond = sync.NewCond(&mu)
ready bool
)
func worker() {
mu.Lock()
for !ready {
cond.Wait() // 等待信号
}
fmt.Println("worker running")
mu.Unlock()
}
func main() {
go worker()
time.Sleep(time.Second)
mu.Lock()
ready = true
cond.Signal() // 唤醒一个
mu.Unlock()
}Cond 方法:
Wait():释放锁,等待信号,收到信号后重新获取锁Signal():唤醒一个等待者Broadcast():唤醒所有等待者
使用场景:
- 生产者-消费者(单个/多个)
- 任务完成通知
- 状态变化通知
Map(并发安全 Map)
基本使用
var m sync.Map
// 存储
m.Store("key", "value")
// 读取
if val, ok := m.Load("key"); ok {
fmt.Println(val)
}
// 删除
m.Delete("key")
// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Println(key, value)
return true
})Map 底层原理
type Map struct {
mu Mutex // 保护 read
read atomic.Value // readOnly(只读,atomic)
dirty map[any]*entry // 脏数据(需加锁)
misses int // 脏数据命中计数
}
type entry struct {
p unsafe.Pointer // 指向实际数据的指针
}
type readOnly struct {
m map[any]*entry
amended bool // true 表示 dirty 有 m 中没有的 key
}工作流程:
读操作:
- 先查 read(atomic,无锁)
- 未命中且 amended=true,查 dirty(加锁)
- 命中 dirty,misses++,达到阈值时提升 dirty 到 read
写操作:
- key 在 read 且未删除:CAS 更新
- key 在 dirty 或 read 被删除:加锁更新 dirty
- key 不存在:加锁写入 dirty
删除操作:
- read 中:原子标记为 nil
- dirty 中:直接删除
适用场景:
- 读多写少
- key set 稳定(不频繁增删)
- 不需要与 map[string]interface 等类型转换
不适用场景:
- 大量写操作(性能不如加锁的 map)
- 频繁增删 key(dirty 提升频繁)
Pool(对象池)
基本使用
var pool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
func process() {
buf := pool.Get().([]byte)
defer pool.Put(buf)
// 使用 buf
// ...
}Pool 底层原理
type Pool struct {
noCopy noCopy
local unsafe.Pointer // [P]poolLocal
localSize uintptr // []poolLocal 的大小
victim unsafe.Pointer // 上一次的 local
victimSize uintptr // 上一次的 localSize
New func() interface{}
}
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
private interface{} // 当前 P 的私有对象
shared []interface{} // 当前 P 的共享对象队列
Mutex // 保护 shared
}工作流程:
Get():
- 先从当前 P 的 private 获取
- 再从当前 P 的 shared 获取(加锁)
- 再从其他 P 的 shared 偷取(加锁)
- 再从 victim 获取(上一次 GC 的 local)
- 最后调用 New 创建
Put():
- 放入当前 P 的 private(如果为空)
- 否则放入当前 P 的 shared
GC 行为:
- 每次 GC 时,victim 变为新的 local
- 原 local 变为新的 victim
- victim 中的对象会被回收(避免内存泄露)
适用场景:
- 复用临时对象(减少 GC 压力)
- 缓存昂贵对象(连接、缓冲区等)
- 不适用:保存连接池(连接可能已失效)
Atomic(原子操作)
var count int64
// 加
atomic.AddInt64(&count, 1)
// 加载
val := atomic.LoadInt64(&count)
// 存储
atomic.StoreInt64(&count, 100)
// 比较并交换
atomic.CompareAndSwapInt64(&count, 100, 200)
// 交换
old := atomic.SwapInt64(&count, 0)atomic.Value(任意类型原子操作):
var config atomic.Value
config.Store(Config{Host: "localhost"})
c := config.Load().(Config)原子操作 vs 锁:
| 特性 | 原子操作 | 锁 |
|---|---|---|
| 性能 | 更高 | 较低 |
| 功能 | 单变量 | 复杂逻辑 |
| 适用 | 简单计数 | 多变量/复杂操作 |
常见面试题
1. Mutex vs channel
| 场景 | Mutex | Channel |
|---|---|---|
| 共享数据 | ✓ | ✗ |
| 通信 | ✗ | ✓ |
| 锁竞争 | 可能有 | 无(通过传递) |
| 编程模型 | 共享内存 | CSP |
2. 如何检测数据竞争
# 运行时检测
go run -race main.go
# 测试时检测
go test -race ./...
# 构建时检测
go build -race3. sync.Map vs map + Mutex
- sync.Map:读多写少,key 稳定
- map + Mutex:写多,key 变化频繁
4. Pool 的 Get 可能返回 nil 吗?
- New 为 nil,且池为空时,返回 nil
- 建议始终设置 New 函数
练习题
- 实现一个线程安全的计数器
- 实现一个简单的限流器(令牌桶)
- 使用 sync.Pool 优化字符串拼接
- 实现一个可取消的任务队列
