Channel
Channel 是 Go 语言中 Goroutine 之间的通信管道。它遵循 CSP(Communicating Sequential Processes)模型,允许一个 Goroutine 将值发送到 Channel,另一个 Goroutine 从 Channel 接收值,从而实现安全的数据传递和同步。Channel 是类型安全的,一个 Channel 只能传递一种特定类型的值。
Channel 的创建
make(chan T)
Channel 是引用类型,必须使用 make 函数创建。未初始化的 Channel 值为 nil。
// 声明并创建 Channel
ch1 := make(chan int) // 无缓冲 channel
ch2 := make(chan string, 10) // 缓冲大小为 10 的 channel
ch3 := make(chan struct{}) // 用于信号通知的空结构体 channel
ch4 := make(chan any) // any 类型的 channel(不推荐)
// Channel 的零值是 nil
var ch5 chan float64 // nil channelChannel 的类型
// 双向 channel
var ch chan int // 可以发送和接收
// 单向 channel(只发)
var sendOnly chan<- int // 只能发送,不能接收
// 单向 channel(只收)
var recvOnly <-chan int // 只能接收,不能发送有缓冲 vs 无缓冲 Channel
无缓冲 Channel(Unbuffered Channel)
无缓冲 Channel 在发送和接收操作之间建立同步关系:发送操作会阻塞直到有另一个 Goroutine 接收;接收操作会阻塞直到有另一个 Goroutine 发送。它是一种同步通信机制,不存储任何数据。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int) // 无缓冲 channel
// 发送方
go func() {
fmt.Println("发送方: 准备发送数据...")
ch <- 42
fmt.Println("发送方: 数据已被接收")
}()
// 接收方
time.Sleep(500 * time.Millisecond)
fmt.Println("接收方: 准备接收数据...")
value := <-ch
fmt.Printf("接收方: 收到数据 %d\n", value)
// 输出:
// 发送方: 准备发送数据...
// 接收方: 准备接收数据...
// 接收方: 收到数据 42
// 发送方: 数据已被接收
}有缓冲 Channel(Buffered Channel)
有缓冲 Channel 在内部维护一个固定大小的队列。发送操作在缓冲区未满时不会阻塞;接收操作在缓冲区非空时不会阻塞。当缓冲区满时,发送操作阻塞;当缓冲区为空时,接收操作阻塞。
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 3) // 缓冲大小为 3
// 缓冲区未满,发送不会阻塞
ch <- 1
fmt.Println("发送 1,缓冲区: [1]")
ch <- 2
fmt.Println("发送 2,缓冲区: [1, 2]")
ch <- 3
fmt.Println("发送 3,缓冲区: [1, 2, 3](已满)")
// 缓冲区满,再发送会阻塞(在另一个 goroutine 中)
go func() {
ch <- 4 // 这行会阻塞,直到有人接收
fmt.Println("发送 4 成功")
}()
// 接收数据,释放缓冲区
fmt.Println("接收:", <-ch) // 接收 1
fmt.Println("接收:", <-ch) // 接收 2
// 此时发送方的 ch <- 4 可以执行了
fmt.Println("接收:", <-ch) // 接收 3
fmt.Println("接收:", <-ch) // 接收 4
}对比总结
| 特性 | 无缓冲 Channel | 有缓冲 Channel |
|---|---|---|
| 声明 | make(chan T) | make(chan T, n) |
| 发送行为 | 阻塞直到接收者准备好 | 缓冲区满时阻塞 |
| 接收行为 | 阻塞直到发送者准备好 | 缓冲区空时阻塞 |
| 用途 | 同步 Goroutine | 异步处理,解耦生产消费速率 |
| 容量 | 0 | n |
| 典型场景 | 信号传递、握手协议 | 任务队列、限流 |
如何选择?
- 需要保证同步时用无缓冲 Channel(如”完成信号”)
- 需要解耦生产者和消费者的处理速度时用有缓冲 Channel(如”任务队列”)
- 默认倾向于使用有缓冲 Channel,除非你有明确的同步需求
Channel 的发送、接收与关闭
发送和接收
ch <- value // 发送 value 到 channel
value := <-ch // 从 channel 接收值
<-ch // 从 channel 接收值,丢弃
// 接收时检查 channel 是否已关闭
value, ok := <-ch
if !ok {
// channel 已关闭,没有更多数据
}关闭 Channel
close(ch) // 关闭 channelChannel 关闭规则(重要)
- 只有发送者应该关闭 channel,接收者不应该关闭
- 关闭已关闭的 channel 会 panic
- 向已关闭的 channel 发送会 panic
- 从已关闭的 channel 接收会返回零值和 false
- 关闭 nil channel 会 panic
完整的发送-接收-关闭模式
package main
import (
"fmt"
"sync"
)
func producer(id int, ch chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= 3; i++ {
msg := fmt.Sprintf("生产者 %d-消息 %d", id, i)
ch <- msg
fmt.Println("发送:", msg)
}
}
func consumer(ch <-chan string, done chan<- struct{}) {
for msg := range ch {
fmt.Println(" 接收:", msg)
}
fmt.Println("消费者退出(channel 已关闭)")
done <- struct{}{}
}
func main() {
ch := make(chan string, 10)
done := make(chan struct{})
var wg sync.WaitGroup
// 启动消费者
go consumer(ch, done)
// 启动 2 个生产者
for i := 1; i <= 2; i++ {
wg.Add(1)
go producer(i, ch, &wg)
}
// 等待所有生产者完成后关闭 channel
go func() {
wg.Wait()
close(ch)
}()
// 等待消费者完成
<-done
}for range 遍历 Channel
for range 可以持续从 Channel 接收值,直到 Channel 被关闭:
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
// 发送方
go func() {
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch) // 关闭 channel,for range 会自动退出
}()
// 接收方:for range 会持续接收直到 channel 关闭
for value := range ch {
fmt.Println("收到:", value)
}
fmt.Println("遍历结束")
}
// 输出:
// 收到: 1
// 收到: 2
// 收到: 3
// 收到: 4
// 收到: 5
// 遍历结束for range 和 channel 关闭
for range ch会在 channel 被关闭且缓冲区为空后自动退出循环- 如果 channel 没有被关闭,
for range会永远阻塞 - 绝对不要在接收端关闭 channel——这是 Go 社区的约定
单向 Channel
单向 Channel 用于在函数签名中限制 Channel 的使用方向:chan<- T(只发送)或 <-chan T(只接收)。这提高了代码的安全性,明确表达了函数的意图——接收者不需要知道 Channel 是如何被填充的,发送者也不需要知道数据如何被消费。
基本用法
// 只发送 channel(生产者函数)
func producer(ch chan<- int) {
ch <- 1
ch <- 2
ch <- 3
// ch <- value 只能发送
// value := <-ch // 编译错误:不能从只发送 channel 接收
// close(ch) // 编译错误:不能关闭只发送 channel(除非是创建者)
}
// 只接收 channel(消费者函数)
func consumer(ch <-chan int) {
for value := range ch {
fmt.Println(value)
}
// ch <- value // 编译错误:不能向只接收 channel 发送
}
func main() {
ch := make(chan int)
go producer(ch)
consumer(ch)
}类型转换
双向 Channel 可以隐式转换为单向 Channel,反之不行:
var ch chan int // 双向
var sendOnly chan<- int // 只发送
var recvOnly <-chan int // 只接收
// 双向 → 单向(隐式转换,安全)
sendOnly = ch // ✅
recvOnly = ch // ✅
// 单向 → 双向(不允许)
// ch = sendOnly // ❌ 编译错误
// ch = recvOnly // ❌ 编译错误select 语句
select 语句使一个 Goroutine 可以同时等待多个 Channel 操作。select 会阻塞直到至少有一个 case 可以执行,然后随机选择一个可执行的 case(如果多个同时就绪)。如果没有 case 就绪且有 default,则执行 default。如果没有 default 且没有 case 就绪,select 会阻塞。
基本用法
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "来自 channel 1"
}()
go func() {
time.Sleep(50 * time.Millisecond)
ch2 <- "来自 channel 2"
}()
// select 等待多个 channel,哪个先就绪执行哪个
select {
case msg1 := <-ch1:
fmt.Println("收到:", msg1)
case msg2 := <-ch2:
fmt.Println("收到:", msg2)
}
// 输出: 收到: 来自 channel 2(ch2 先就绪)
}带 default 的 select(非阻塞操作)
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 1)
ch <- 42 // 发送一个值
select {
case val := <-ch:
fmt.Println("接收到:", val) // 这次会执行
default:
fmt.Println("没有数据")
}
select {
case val := <-ch:
fmt.Println("接收到:", val)
default:
fmt.Println("没有数据,非阻塞返回") // 这次会执行
}
}多个 case 同时就绪
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
// 两个都立即可用
ch1 <- "one"
ch2 <- "two"
// 多个 case 就绪时,随机选择一个
for i := 0; i < 10; i++ {
select {
case <-ch1:
fmt.Print("1 ")
case <-ch2:
fmt.Print("2 ")
}
}
// 输出类似: 1 2 2 1 1 2 1 2 2 1(随机顺序)
}超时处理:time.After()
time.After() 的内存泄漏风险
time.After(d) 会在时间到期前持续占用一个 timer 和一个 channel。在高频调用场景(如循环中的 select),应使用 time.NewTimer(d) + timer.Stop() 替代,以避免资源泄漏。
package main
import (
"context"
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second) // 模拟耗时操作
ch <- "结果数据"
}()
select {
case result := <-ch:
fmt.Println("收到:", result)
case <-time.After(2 * time.Second):
fmt.Println("超时:2 秒内未收到数据")
}
// 输出: 超时:2 秒内未收到数据
}使用 Timer 替代 time.After(推荐)
func fetchDataWithTimeout(ch <-chan string, timeout time.Duration) (string, error) {
timer := time.NewTimer(timeout)
defer timer.Stop() // 确保释放 timer 资源
select {
case data := <-ch:
return data, nil
case <-timer.C:
return "", fmt.Errorf("操作超时")
}
}使用 context.WithTimeout(更推荐)
func fetchData(ctx context.Context) (string, error) {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch <- "结果"
}()
select {
case result := <-ch:
return result, nil
case <-ctx.Done():
return "", ctx.Err() // context.DeadlineExceeded
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
result, err := fetchData(ctx)
if err != nil {
fmt.Println("错误:", err) // 错误: context deadline exceeded
return
}
fmt.Println("结果:", result)
}nil Channel 的行为
nil Channel 是指声明但未使用 make 初始化的 Channel。对 nil Channel 的操作有特殊的行为规则,这在某些并发模式中非常有用。
| 操作 | nil Channel 的行为 |
|---|---|
发送 ch <- value | 永久阻塞 |
接收 <-ch | 永久阻塞 |
关闭 close(ch) | panic |
select 中的 case | 永远不会被选中 |
package main
import (
"fmt"
"time"
)
func main() {
var ch chan int // nil channel
// select 中 nil channel 的 case 永远不会被选中
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- 1
}()
select {
case val := <-ch1:
fmt.Println("ch1:", val) // 会执行
case val := <-ch2:
fmt.Println("ch2:", val) // 不会执行
case val := <-ch: // nil channel
fmt.Println("ch:", val) // 永远不会执行
}
// 输出: ch1: 1
}nil Channel 的妙用
在某些场景下,将 Channel 设为 nil 可以暂时”禁用”某个 select case,而不需要修改 select 语句本身:
var ch chan int // 初始为 nil,该 case 被禁用
// 动态启用
ch = make(chan int)
// 动态禁用
ch = nilChannel 的关闭原则
核心原则
Channel 关闭的黄金法则
- 不要从接收端关闭 channel——你不知道发送者是否还在发送
- 不要向已关闭的 channel 发送——会导致 panic
- 不要关闭 nil channel——会导致 panic
- 不要重复关闭 channel——会导致 panic
- channel 的关闭是可选的——只有当接收端需要知道”没有更多数据”时才关闭
安全关闭 Channel 的模式
模式 1:单个生产者,直接关闭
func producer(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch) // 唯一的生产者,直接关闭
}模式 2:多个生产者,使用 sync.Once
var once sync.Once
func producer(id int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
ch <- id*100 + i
}
once.Do(func() {
close(ch) // 确保只关闭一次
})
}模式 3:多个生产者,使用 done channel
func producer(id int, ch chan int, done <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-done:
return // 收到停止信号,退出
case ch <- id*100 + i:
}
}
}
// 由主函数负责关闭 channel
func main() {
ch := make(chan int, 100)
done := make(chan struct{})
var wg sync.WaitGroup
for id := 1; id <= 3; id++ {
wg.Add(1)
go producer(id, ch, done, &wg)
}
time.Sleep(1 * time.Second)
close(done) // 通知所有生产者停止
wg.Wait() // 等待所有生产者退出
close(ch) // 现在安全地关闭 channel
}常见错误
错误 1:向已关闭的 channel 发送
ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel错误 2:关闭 nil channel
var ch chan int
close(ch) // panic: close of nil channel错误 3:重复关闭 channel
ch := make(chan int)
close(ch)
close(ch) // panic: close of closed channel错误 4:从已关闭的 channel 接收得到零值
ch := make(chan int, 2)
ch <- 42
ch <- 100
close(ch)
fmt.Println(<-ch) // 42
fmt.Println(<-ch) // 100
fmt.Println(<-ch) // 0(零值)
v, ok := <-ch
fmt.Println(v, ok) // 0 false检测 channel 是否关闭
接收操作可以使用双值赋值 value, ok := <-ch 来检测 channel 是否已关闭:
ok为true:接收到有效数据ok为false:channel 已关闭,value是零值
错误 5:在循环中忘记 break 导致 case 穿透
// 注意:Go 的 select 中 case 不需要 break
// 但 select 本身不穿透,这一点与 switch 不同
select {
case <-ch1:
handle1()
case <-ch2:
handle2()
}
// 每个 case 独立执行,不会穿透练习题
练习 1:Channel 方向与生产者-消费者
编写一个程序,包含一个生产者函数 produce(ch chan<- int) 和一个消费者函数 consume(ch <-chan int, done chan<- struct{})。生产者向 channel 发送 1-10 的整数,消费者接收并打印所有值,完成后通过 done channel 通知主函数。
代码:
package main
import (
"fmt"
"sync"
)
func produce(ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= 10; i++ {
ch <- i
fmt.Printf("生产: %d\n", i)
}
fmt.Println("生产者完成")
}
func consume(ch <-chan int, done chan<- struct{}) {
for val := range ch {
fmt.Printf(" 消费: %d\n", val)
}
fmt.Println("消费者退出")
close(done)
}
func main() {
ch := make(chan int, 5) // 有缓冲,解耦速率
done := make(chan struct{})
var wg sync.WaitGroup
// 启动消费者
go consume(ch, done)
// 启动生产者
wg.Add(1)
go produce(ch, &wg)
// 生产者完成后关闭 channel
go func() {
wg.Wait()
close(ch)
}()
<-done
fmt.Println("程序结束")
}输出(生产消费交替进行):
生产: 1
消费: 1
生产: 2
消费: 2
...
生产: 10
生产者完成
消费: 10
消费者退出
程序结束练习 2:select 多路复用与超时
编写一个程序,启动 3 个 Goroutine 分别执行不同耗时的任务(1s、2s、3s),每个任务完成后将结果发送到各自的 channel。使用 select 接收最先完成的任务结果,要求支持 2.5 秒超时。
代码:
package main
import (
"fmt"
"time"
)
func task(id string, duration time.Duration, ch chan<- string) {
time.Sleep(duration)
ch <- fmt.Sprintf("任务 %s 完成(耗时 %v)", id, duration)
}
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
ch3 := make(chan string)
go task("A", 1*time.Second, ch1)
go task("B", 2*time.Second, ch2)
go task("C", 3*time.Second, ch3)
// 接收所有任务结果
results := make([]string, 0, 3)
timeout := time.After(2.5 * time.Second)
for len(results) < 3 {
select {
case r := <-ch1:
fmt.Println("收到:", r)
results = append(results, r)
ch1 = nil // 禁用该 case
case r := <-ch2:
fmt.Println("收到:", r)
results = append(results, r)
ch2 = nil
case r := <-ch3:
fmt.Println("收到:", r)
results = append(results, r)
ch3 = nil
case <-timeout:
fmt.Println("⚠️ 超时!还有任务未完成")
// 可以在这里进行清理
return
}
}
fmt.Printf("\n所有任务完成,共 %d 个结果\n", len(results))
}输出:
收到: 任务 A 完成(耗时 1s)
收到: 任务 B 完成(耗时 2s)
收到: 任务 C 完成(耗时 3s)
所有任务完成,共 3 个结果关键点:将已接收的 channel 设为 nil 来禁用其 case,避免重复处理。
练习 3:Channel 常见错误分析
以下代码中有哪些问题?请逐一指出并修复。
func main() {
var ch chan int
go func() {
ch <- 1
}()
close(ch)
val := <-ch
fmt.Println(val)
}问题分析:
- 第 3 行
var ch chan int:声明了 nil channel - 第 6 行
ch <- 1:向 nil channel 发送会永久阻塞,Goroutine 永远不会退出 - 第 9 行
close(ch):关闭 nil channel 会导致 panic
修复版本:
func main() {
// 修复 1:使用 make 初始化 channel
ch := make(chan int, 1) // 使用有缓冲 channel 防止发送阻塞
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch <- 1 // 现在可以正常发送
}()
// 等待发送完成后再关闭
wg.Wait()
// 修复 2:只在发送完成后关闭
close(ch)
// 修复 3:接收已关闭的 channel 时使用 ok 检查
if val, ok := <-ch; ok {
fmt.Println("收到:", val)
} else {
fmt.Println("channel 已关闭")
}
}
// 输出: 收到: 1总结:使用 channel 前必须用 make 初始化;发送和关闭操作需要正确的同步;接收已关闭的 channel 时应检查 ok 值。
