Channel 底层实现
🔴 困难题目描述
解释 Go channel 的底层实现原理。无缓冲 channel 和有缓冲 channel 的发送/接收操作有什么区别?
示例代码
// 无缓冲 channel
ch1 := make(chan int)
ch1 <- 1 // 阻塞直到有接收者
val := <-ch1 // 阻塞直到有发送者
// 有缓冲 channel
ch2 := make(chan int, 10)
ch2 <- 1 // 缓冲未满时不阻塞
val := <-ch2 // 缓冲不为空时不阻塞提示
- channel 的底层结构是
hchan - 发送和接收操作会涉及等待队列
- 需要考虑锁的竞争
解法
参考答案 (3 个标签)
channel 底层结构 并发
底层结构
type hchan struct {
qcount uint // 缓冲区中元素个数
dataqsiz uint // 缓冲区大小
buf unsafe.Pointer // 缓冲区指针(环形队列)
elemsize uint16 // 元素大小
closed uint32 // 是否关闭
elemtype *_type // 元素类型
sendx uint // 发送索引
recvx uint // 接收索引
recvq waitq // 接收等待队列
sendq waitq // 发送等待队列
lock mutex // 保护所有字段
}
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *goroutine
next *sudog
prev *sudog
elem unsafe.Pointer
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
}图解
hchan 结构(有缓冲,容量为 3):
┌─────────────────────────────────┐
│ qcount = 2 │ 当前元素个数
│ dataqsiz = 3 │ 缓冲区大小
│ buf = [1, 2, _] │ 环形缓冲区
│ sendx = 2 │ 下次发送位置
│ recvx = 0 │ 下次接收位置
│ recvq = [] │ 接收队列
│ sendq = [g1, g2] │ 发送队列(2 个阻塞的发送者)
└─────────────────────────────────┘
环形缓冲区示意图:
0 1 2
┌────┬────┬────┐
│ 1 │ 2 │ _ │
└────┴────┴────┘
↑ ↑
recvx sendx
(下次接收) (下次发送)发送操作流程
无缓冲 Channel
ch <- value
// 伪代码
func chansend(ch *hchan, ep unsafe.Pointer) {
lock(&ch.lock)
// 1. 如果有等待的接收者,直接发送
if !empty(ch.recevq) {
sg := dequeue(&ch.recevq)
memcpy(sg.elem, ep, ch.elemsize)
unlock(&ch.lock)
goready(sg.g) // 唤醒接收者
return
}
// 2. 无缓冲且无接收者,加入发送队列
sg := acquireSudog()
sg.elem = ep
enqueue(&ch.sendq, sg)
// 3. 阻塞当前 goroutine
goparkunlock(&ch.lock, "chan send")
}有缓冲 Channel
ch <- value
// 伪代码
func chansend(ch *hchan, ep unsafe.Pointer) {
lock(&ch.lock)
// 1. 如果有等待的接收者,直接发送
if !empty(ch.recevq) {
sg := dequeue(&ch.recevq)
memcpy(sg.elem, ep, ch.elemsize)
unlock(&ch.lock)
goready(sg.g)
return
}
// 2. 如果缓冲区未满,写入缓冲区
if ch.qcount < ch.dataqsiz {
memcpy(ch.buf[ch.sendx], ep, ch.elemsize)
ch.sendx = (ch.sendx + 1) % ch.dataqsiz
ch.qcount++
unlock(&ch.lock)
return
}
// 3. 缓冲区已满,加入发送队列
sg := acquireSudog()
sg.elem = ep
enqueue(&ch.sendq, sg)
goparkunlock(&ch.lock, "chan send")
}接收操作流程
val := <-ch
// 伪代码
func chanrecv(ch *hchan, ep unsafe.Pointer) {
lock(&ch.lock)
// 1. 如果有等待的发送者
if !empty(ch.sendq) {
sg := dequeue(&ch.sendq)
memcpy(ep, sg.elem, ch.elemsize)
// 如果是有缓冲 channel,从缓冲区复制到发送者
if ch.dataqsiz > 0 {
memcpy(ep, &ch.buf[ch.recvx], ch.elemsize)
memcpy(&ch.buf[ch.recvx], sg.elem, ch.elemsize)
ch.recvx = (ch.recvx + 1) % ch.dataqsiz
}
unlock(&ch.lock)
goready(sg.g)
return
}
// 2. 如果缓冲区有数据
if ch.qcount > 0 {
memcpy(ep, &ch.buf[ch.recvx], ch.elemsize)
ch.recvx = (ch.recvx + 1) % ch.dataqsiz
ch.qcount--
unlock(&ch.lock)
return
}
// 3. 无数据可接收,加入接收队列
sg := acquireSudog()
sg.elem = ep
enqueue(&ch.recevq, sg)
goparkunlock(&ch.lock, "chan receive")
}关键点
- 锁保护:所有操作都持有锁
- 直接传递:有等待队列时,直接拷贝数据(零拷贝)
- 环形缓冲区:使用 sendx 和 recvx 实现环形队列
- 等待队列:sendq 和 recvq 存储阻塞的 goroutine
扩展:Select 实现
select {
case v := <-ch1:
fmt.Println(v)
case ch2 <- 1:
fmt.Println("sent")
case <-time.After(time.Second):
fmt.Println("timeout")
}Select 实现原理
// 伪代码
func selectgo(cas []scase) (int, bool) {
// 1. 随机打乱 case 顺序(避免饥饿)
shuffle(cas)
// 2. 顺序检查每个 case
for i, c := range cas {
if c.isRecv && !empty(c.ch.recvq) {
return i, true // 可接收
}
if c.isSend && !empty(c.ch.sendq) {
return i, true // 可发送
}
}
// 3. 没有可执行的 case
if hasDefault {
return defaultIndex, false // 执行 default
}
// 4. 加入所有 case 的等待队列
for i, c := range cas {
enqueue(&c.ch.waitq, getg())
}
// 5. 阻塞,直到某个 case 就绪
gopark()
// 6. 被唤醒后,返回执行的 case
return selectedCase, true
}