导航菜单

Sync 包与并发原语

Go 的 sync 包提供了多种并发原语,用于协调 goroutine 之间的同步和共享数据的访问。

Mutex(互斥锁)

基本使用

var mu sync.Mutex
var count int

func increment() {
    mu.Lock()
    defer mu.Unlock()
    count++
}

Mutex 底层原理

type Mutex struct {
    state int32  // 状态位
    sema  uint32 // 信号量
}

// state 三个位:
// - 第 0 位:locked(是否锁定)
// - 第 1 位:woken(是否有唤醒的 goroutine)
// - 第 2-31 位:waiter(等待的 goroutine 数量)

锁的两种模式:

  1. 正常模式(Normal Mode)

    • 等待者按 FIFO 顺序唤醒
    • 被唤醒的 goroutine 与新来的 goroutine 竞争
    • 新来的更容易成功(还在 CPU 上运行)
    • 可能导致”饥饿”,但有更好的吞吐量
  2. 饥饿模式(Starvation Mode)

    • 锁直接交给等待队列队头
    • 新来的 goroutine 不能抢锁
    • 当等待时间超过 1ms 时触发
    • 当等待者只剩 1 个时退出

面试重点:为什么 Mutex 不能复制?

type Counter struct {
    mu sync.Mutex
    count int
}

// ❌ 错误:Mutex 被复制
func (c Counter) Increment() { // 值接收者,复制了 Mutex
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

// ✅ 正确:使用指针接收者
func (c *Counter) Increment() {
    c.mu.Lock()
    c.count++
    c.mu.Unlock()
}

Mutex 包含状态字段,复制后状态不一致,可能导致死锁或数据竞争。

RWMutex(读写锁)

var rw sync.RWMutex
var data map[string]string

func read(key string) string {
    rw.RLock()
    defer rw.RUnlock()
    return data[key]
}

func write(key, value string) {
    rw.Lock()
    defer rw.Unlock()
    data[key] = value
}

RWMutex 特性:

  • 读锁:多个 goroutine 可以同时持有
  • 写锁:独占,阻塞所有读写锁
  • 读锁优先:写锁可能饥饿
  • 适用场景:读多写少

注意事项:

// ❌ 错误:RWMutex 嵌套
func nested() {
    rw.RLock()
    // rw.RLock() // 可重入,但 Go 不支持
    rw.Lock() // 死锁!
    rw.Unlock()
    rw.RUnlock()
}

// ❌ 错误:Lock 中嵌套 RLock
func nested2() {
    rw.Lock()
    rw.RLock() // 死锁!
    rw.RUnlock()
    rw.Unlock()
}

WaitGroup

基本使用

var wg sync.WaitGroup

func worker(id int) {
    defer wg.Done()
    fmt.Printf("worker %d\n", id)
}

func main() {
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i)
    }
    wg.Wait() // 等待所有 worker 完成
}

WaitGroup 底层原理

type WaitGroup struct {
    noCopy noCopy        // 禁止复制
    state atomic.Uint64  // 高 32 位:counter,低 32 位:waiter
    sema  uint32         // 信号量
}

操作流程:

  1. Add(delta):counter += delta
  2. Done():Add(-1),即 counter -= 1
  3. Wait()
    • counter > 0:waiter++,阻塞
    • counter == 0:唤醒所有 waiter

面试重点:Add 时机

// ✅ 正确:在 goroutine 外 Add
wg.Add(1)
go func() {
    defer wg.Done()
    // ...
}()

// ❌ 错误:在 goroutine 内 Add
go func() {
    wg.Add(1) // 可能已经 Wait 了
    defer wg.Done()
    // ...
}()
wg.Wait() // 可能 Add 还没执行

Once

基本使用

var once sync.Once
var instance *Singleton

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{}
    })
    return instance
}

Once 底层原理

type Once struct {
    done atomic.Uint32 // 0: 未执行, 1: 已执行
    m    Mutex
}

func (o *Once) Do(f func()) {
    // 快速路径:原子检查
    if o.done.Load() == 0 {
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done.Load() == 0 {
        defer o.done.Store(1)
        f()
    }
}

面试重点:Once 保证什么?

  • 保证函数只执行一次
  • 多个 goroutine 同时调用,只有一个执行
  • 执行时阻塞其他调用
  • 执行完成后,其他调用直接返回

常见问题:

// ❌ 错误:Do 中 panic
var once sync.Once
once.Do(func() {
    panic("error")
})
// 再次调用 Once.Do 不会再执行 f

// ✅ 解决:使用 recover
once.Do(func() {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("panic: %v", r)
        }
    }()
    // ...
})

Cond(条件变量)

var (
    mu    sync.Mutex
    cond  = sync.NewCond(&mu)
    ready bool
)

func worker() {
    mu.Lock()
    for !ready {
        cond.Wait() // 等待信号
    }
    fmt.Println("worker running")
    mu.Unlock()
}

func main() {
    go worker()
    
    time.Sleep(time.Second)
    mu.Lock()
    ready = true
    cond.Signal() // 唤醒一个
    mu.Unlock()
}

Cond 方法:

  • Wait():释放锁,等待信号,收到信号后重新获取锁
  • Signal():唤醒一个等待者
  • Broadcast():唤醒所有等待者

使用场景:

  • 生产者-消费者(单个/多个)
  • 任务完成通知
  • 状态变化通知

Map(并发安全 Map)

基本使用

var m sync.Map

// 存储
m.Store("key", "value")

// 读取
if val, ok := m.Load("key"); ok {
    fmt.Println(val)
}

// 删除
m.Delete("key")

// 遍历
m.Range(func(key, value interface{}) bool {
    fmt.Println(key, value)
    return true
})

Map 底层原理

type Map struct {
    mu     Mutex          // 保护 read
    read   atomic.Value   // readOnly(只读,atomic)
    dirty  map[any]*entry // 脏数据(需加锁)
    misses int            // 脏数据命中计数
}

type entry struct {
    p unsafe.Pointer // 指向实际数据的指针
}

type readOnly struct {
    m       map[any]*entry
    amended bool // true 表示 dirty 有 m 中没有的 key
}

工作流程:

  1. 读操作

    • 先查 read(atomic,无锁)
    • 未命中且 amended=true,查 dirty(加锁)
    • 命中 dirty,misses++,达到阈值时提升 dirty 到 read
  2. 写操作

    • key 在 read 且未删除:CAS 更新
    • key 在 dirty 或 read 被删除:加锁更新 dirty
    • key 不存在:加锁写入 dirty
  3. 删除操作

    • read 中:原子标记为 nil
    • dirty 中:直接删除

适用场景:

  • 读多写少
  • key set 稳定(不频繁增删)
  • 不需要与 map[string]interface 等类型转换

不适用场景:

  • 大量写操作(性能不如加锁的 map)
  • 频繁增删 key(dirty 提升频繁)

Pool(对象池)

基本使用

var pool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 1024)
    },
}

func process() {
    buf := pool.Get().([]byte)
    defer pool.Put(buf)
    
    // 使用 buf
    // ...
}

Pool 底层原理

type Pool struct {
    noCopy noCopy
    local     unsafe.Pointer // [P]poolLocal
    localSize uintptr        // []poolLocal 的大小
    victim     unsafe.Pointer // 上一次的 local
    victimSize uintptr        // 上一次的 localSize
    New func() interface{}
}

type poolLocal struct {
    poolLocalInternal
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

type poolLocalInternal struct {
    private interface{}   // 当前 P 的私有对象
    shared  []interface{} // 当前 P 的共享对象队列
    Mutex               // 保护 shared
}

工作流程:

  1. Get()

    • 先从当前 P 的 private 获取
    • 再从当前 P 的 shared 获取(加锁)
    • 再从其他 P 的 shared 偷取(加锁)
    • 再从 victim 获取(上一次 GC 的 local)
    • 最后调用 New 创建
  2. Put()

    • 放入当前 P 的 private(如果为空)
    • 否则放入当前 P 的 shared

GC 行为:

  • 每次 GC 时,victim 变为新的 local
  • 原 local 变为新的 victim
  • victim 中的对象会被回收(避免内存泄露)

适用场景:

  • 复用临时对象(减少 GC 压力)
  • 缓存昂贵对象(连接、缓冲区等)
  • 不适用:保存连接池(连接可能已失效)

Atomic(原子操作)

var count int64

// 加
atomic.AddInt64(&count, 1)

// 加载
val := atomic.LoadInt64(&count)

// 存储
atomic.StoreInt64(&count, 100)

// 比较并交换
atomic.CompareAndSwapInt64(&count, 100, 200)

// 交换
old := atomic.SwapInt64(&count, 0)

atomic.Value(任意类型原子操作):

var config atomic.Value

config.Store(Config{Host: "localhost"})

c := config.Load().(Config)

原子操作 vs 锁:

特性原子操作
性能更高较低
功能单变量复杂逻辑
适用简单计数多变量/复杂操作

常见面试题

1. Mutex vs channel

场景MutexChannel
共享数据
通信
锁竞争可能有无(通过传递)
编程模型共享内存CSP

2. 如何检测数据竞争

# 运行时检测
go run -race main.go

# 测试时检测
go test -race ./...

# 构建时检测
go build -race

3. sync.Map vs map + Mutex

  • sync.Map:读多写少,key 稳定
  • map + Mutex:写多,key 变化频繁

4. Pool 的 Get 可能返回 nil 吗?

  • New 为 nil,且池为空时,返回 nil
  • 建议始终设置 New 函数

练习题

  1. 实现一个线程安全的计数器
  2. 实现一个简单的限流器(令牌桶)
  3. 使用 sync.Pool 优化字符串拼接
  4. 实现一个可取消的任务队列

搜索