导航菜单

Channel 底层实现

🔴 困难

题目描述

解释 Go channel 的底层实现原理。无缓冲 channel 和有缓冲 channel 的发送/接收操作有什么区别?

示例代码

// 无缓冲 channel
ch1 := make(chan int)
ch1 <- 1      // 阻塞直到有接收者
val := <-ch1  // 阻塞直到有发送者

// 有缓冲 channel
ch2 := make(chan int, 10)
ch2 <- 1      // 缓冲未满时不阻塞
val := <-ch2  // 缓冲不为空时不阻塞

提示

  • channel 的底层结构是 hchan
  • 发送和接收操作会涉及等待队列
  • 需要考虑锁的竞争

解法

参考答案 (3 个标签)
channel 底层结构 并发

底层结构

type hchan struct {
    qcount   uint           // 缓冲区中元素个数
    dataqsiz uint           // 缓冲区大小
    buf      unsafe.Pointer // 缓冲区指针(环形队列)
    elemsize uint16         // 元素大小
    closed   uint32         // 是否关闭
    elemtype *_type         // 元素类型
    sendx    uint           // 发送索引
    recvx    uint           // 接收索引
    recvq    waitq          // 接收等待队列
    sendq    waitq          // 发送等待队列
    lock     mutex          // 保护所有字段
}

type waitq struct {
    first *sudog
    last  *sudog
}

type sudog struct {
    g        *goroutine
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer
    acquiretime int64
    releasetime int64
    ticket      uint32
    isSelect    bool
}

图解

hchan 结构(有缓冲,容量为 3):

┌─────────────────────────────────┐
│ qcount   = 2                    │  当前元素个数
│ dataqsiz = 3                    │  缓冲区大小
│ buf      = [1, 2, _]            │  环形缓冲区
│ sendx    = 2                    │  下次发送位置
│ recvx    = 0                    │  下次接收位置
│ recvq    = []                   │  接收队列
│ sendq    = [g1, g2]             │  发送队列(2 个阻塞的发送者)
└─────────────────────────────────┘

环形缓冲区示意图:

     0     1     2
   ┌────┬────┬────┐
   │ 1  │ 2  │ _  │
   └────┴────┴────┘
    ↑          ↑
  recvx      sendx
  (下次接收) (下次发送)

发送操作流程

无缓冲 Channel

ch <- value

// 伪代码
func chansend(ch *hchan, ep unsafe.Pointer) {
    lock(&ch.lock)
    
    // 1. 如果有等待的接收者,直接发送
    if !empty(ch.recevq) {
        sg := dequeue(&ch.recevq)
        memcpy(sg.elem, ep, ch.elemsize)
        unlock(&ch.lock)
        goready(sg.g) // 唤醒接收者
        return
    }
    
    // 2. 无缓冲且无接收者,加入发送队列
    sg := acquireSudog()
    sg.elem = ep
    enqueue(&ch.sendq, sg)
    
    // 3. 阻塞当前 goroutine
    goparkunlock(&ch.lock, "chan send")
}

有缓冲 Channel

ch <- value

// 伪代码
func chansend(ch *hchan, ep unsafe.Pointer) {
    lock(&ch.lock)
    
    // 1. 如果有等待的接收者,直接发送
    if !empty(ch.recevq) {
        sg := dequeue(&ch.recevq)
        memcpy(sg.elem, ep, ch.elemsize)
        unlock(&ch.lock)
        goready(sg.g)
        return
    }
    
    // 2. 如果缓冲区未满,写入缓冲区
    if ch.qcount < ch.dataqsiz {
        memcpy(ch.buf[ch.sendx], ep, ch.elemsize)
        ch.sendx = (ch.sendx + 1) % ch.dataqsiz
        ch.qcount++
        unlock(&ch.lock)
        return
    }
    
    // 3. 缓冲区已满,加入发送队列
    sg := acquireSudog()
    sg.elem = ep
    enqueue(&ch.sendq, sg)
    goparkunlock(&ch.lock, "chan send")
}

接收操作流程

val := <-ch

// 伪代码
func chanrecv(ch *hchan, ep unsafe.Pointer) {
    lock(&ch.lock)
    
    // 1. 如果有等待的发送者
    if !empty(ch.sendq) {
        sg := dequeue(&ch.sendq)
        memcpy(ep, sg.elem, ch.elemsize)
        
        // 如果是有缓冲 channel,从缓冲区复制到发送者
        if ch.dataqsiz > 0 {
            memcpy(ep, &ch.buf[ch.recvx], ch.elemsize)
            memcpy(&ch.buf[ch.recvx], sg.elem, ch.elemsize)
            ch.recvx = (ch.recvx + 1) % ch.dataqsiz
        }
        
        unlock(&ch.lock)
        goready(sg.g)
        return
    }
    
    // 2. 如果缓冲区有数据
    if ch.qcount > 0 {
        memcpy(ep, &ch.buf[ch.recvx], ch.elemsize)
        ch.recvx = (ch.recvx + 1) % ch.dataqsiz
        ch.qcount--
        unlock(&ch.lock)
        return
    }
    
    // 3. 无数据可接收,加入接收队列
    sg := acquireSudog()
    sg.elem = ep
    enqueue(&ch.recevq, sg)
    goparkunlock(&ch.lock, "chan receive")
}

关键点

  1. 锁保护:所有操作都持有锁
  2. 直接传递:有等待队列时,直接拷贝数据(零拷贝)
  3. 环形缓冲区:使用 sendx 和 recvx 实现环形队列
  4. 等待队列:sendq 和 recvq 存储阻塞的 goroutine

扩展:Select 实现

select {
case v := <-ch1:
    fmt.Println(v)
case ch2 <- 1:
    fmt.Println("sent")
case <-time.After(time.Second):
    fmt.Println("timeout")
}

Select 实现原理

// 伪代码
func selectgo(cas []scase) (int, bool) {
    // 1. 随机打乱 case 顺序(避免饥饿)
    shuffle(cas)
    
    // 2. 顺序检查每个 case
    for i, c := range cas {
        if c.isRecv && !empty(c.ch.recvq) {
            return i, true // 可接收
        }
        if c.isSend && !empty(c.ch.sendq) {
            return i, true // 可发送
        }
    }
    
    // 3. 没有可执行的 case
    if hasDefault {
        return defaultIndex, false // 执行 default
    }
    
    // 4. 加入所有 case 的等待队列
    for i, c := range cas {
        enqueue(&c.ch.waitq, getg())
    }
    
    // 5. 阻塞,直到某个 case 就绪
    gopark()
    
    // 6. 被唤醒后,返回执行的 case
    return selectedCase, true
}

搜索