Channel
https://mp.weixin.qq.com/s/QgNndPgN1kqxWh-ijSofkw
1. 简介
Channel 是 Go 语言中一种特殊的类型,它是一个并发安全的队列,用于在 goroutine 之间传递数据。
2. 数据结构
Channel 的数据结构如下:
2.1 hchan
type hchan struct {
// 循环队列
qcount uint // 通道中数据个数
dataqsiz uint // buf长度
buf unsafe.Pointer // 数组指针
sendx uint // send index
recvx uint // receive index
elemsize uint16 // 元素大小
elemtype *_type // 元素类型
closed uint32 // 通道关闭标志
recvq waitq // 由双向链表实现的recv waiters队列
sendq waitq // 由双向链表实现的send waiters队列
lock mutex
}
qcount: 通道中数据的个数
dataqsiz: 数组长度
buf: 指向数组的指针,数组中存储往channel发送的数据
sendx: 往数组中发送元素的index
recvx: 从数组中接收元素的index
elemsize: 数组中存储的元素大小
elemtype: 数组中存储的元素类型
closed: 通道关闭的标志
recvq: 因为读取channel而陷入阻塞的协程等待队列
sendq: 因为写入channel而陷入阻塞的协程等待队列。
lock: 用于保护hchan的互斥锁。
2.2 waitq
// 等待队列 (双向链表)
type waitq struct {
first *sudog
last *sudog
}
waitq是因为读写channel而陷入阻塞的协程等待队列
- first 队列头部
- last 队列尾部
2.3 sudog
type sudog struct {
g *g // 等待send或recv的协程g
next *sudog // 等待队列下一个结点next
prev *sudog // 等待队列前一个结点prev
elem unsafe.Pointer // data element (may point to stack)
success bool // 标记协程g被唤醒是因为数据传递(true)还是channel被关闭(false)
c *hchan // channel
}
sudog 是 “synchronization user data object” 的缩写,它代表一个正在等待的 goroutine。
sudog是协程等待队列的节点:
g 因读写而陷入阻塞的协程
next 等待队列下一个节点
prev 等待队列前一个节点
elem 对于写channel,表示需要发送到channel的数据指针;对于读channel,表示需要被赋值的数据指针。
success 标记协程被唤醒是因为数据传递(true)还是channel被关闭(false)
c 指向channel的指针
3. 操作
3.1 创建channel
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// buf数组所需分配内存大小
mem := elem.size*uintptr(size)
var c *hchan
switch {
case mem == 0:// Unbuffered channels,buf无需内存分配
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0: // Buffered channels,通道元素类型非指针
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Buffered channels,通道元素类型是指针
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
makechan 函数是 Go 语言中创建 channel 的核心函数。 它根据 channel 的类型和缓冲区大小,分配内存,初始化 hchan 结构体的各个字段,并返回指向该结构体的指针。
3.2 通道写入
通道关闭情况和通道为nil的情况
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 1.channel为nil
if c == nil {
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock) //加锁
// 2.如果channel已关闭,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// Block on the channel.
mysg := acquireSudog()
c.sendq.enqueue(mysg) // 入sendq等待队列
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
closed := !mysg.success // 协程被唤醒的原因是因为数据传递还是通道被关闭
// 3.因channel被关闭导致阻塞写协程被唤醒并panic
if closed {
panic(plainError("send on closed channel"))
}
}
- 如果channel为nil,直接panic。
- 如果channel不为nil,且channel已经关闭,往已经关闭的channel发送数据,会导致panic。
- 对因写入而陷入阻塞的协程,如果channel被关闭,阻塞协程会被唤醒并panic.
3.2.1 无缓冲channel
case1:写时存在阻塞读协程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// ...
- 加锁;
- 从阻塞度协程队列中取出一个 goroutine 的封装对象 sudog;
- 在 send 方法中,会基于 memmove 方法,直接将元素拷贝交给 sudog 对应的 goroutine;
- 在 send 方法中会完成解锁动作.
3.2.2 写时无阻塞读协程但环形缓冲区仍有空间
case2 : 写时无阻塞读协程但环形缓冲区仍有空间
这个时候,会将元素拷贝到环形缓冲区中,然后解锁。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// ...
}
- 加锁;
- 将当前元素添加到环形缓冲区 sendx 对应的位置;
- sendx++;
- qcount++;
- 解锁,返回.
3.2.3 写时无阻塞读协程且环形缓冲区无空间
case3: 写时无阻塞读协程且环形缓冲区无空间
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
gp.waiting = nil
closed := !mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}
• 加锁;
• 构造封装当前 goroutine 的 sudog 对象;
• 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系;
• 把 sudog 添加到当前 channel 的阻塞写协程队列中;
• park 当前协程;
• 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走);
• 解锁,返回
3.3 读取
3.3.1 读取总流程
3.3.2 读时有阻塞的写协程
3.3.3 读时无阻塞写协程且缓冲区有元素
3.3.4 读时无阻塞写协程且缓冲区无元素
4. 阻塞与非阻塞模式
在上述源码分析流程中,均是以阻塞模式为主线进行讲述,忽略非阻塞模式的有关处理逻辑. 此处阐明两个问题:
• 非阻塞模式下,流程逻辑有何区别?
• 何时会进入非阻塞模式?
关闭
-
关闭未初始化过的 channel 会 panic;
-
加锁;
-
重复关闭 channel 会 panic;
-
将阻塞读协程队列中的协程节点统一添加到 glist;
-
将阻塞写协程队列中的协程节点统一添加到 glist;
-
唤醒 glist 当中的所有协程.
-
向已经 close 的 channel 发送数据会 panic (不是在这个函数里 panic,而是在发送方 Goroutine 醒来后 panic)。
在 closechan 函数中,glist 的主要作用是 收集所有需要被唤醒的 Goroutine,然后进行批量唤醒。
唤醒在 closechan 中收集的 Goroutine 的目的是为了让这些 Goroutine 知道 Channel 已经关闭,并采取相应的行动。
func closechan(c *hchan) {
// 1. 前置条件检查:
// - 如果 channel 为 nil,panic。
if c == nil {
panic(plainError("close of nil channel"))
}
// 2. 加锁:
// - 防止并发的 close,以及与其他 channel 操作的竞争。
lock(&c.lock)
// 3. 检查 channel 是否已经关闭:
// - 如果 channel 已经关闭,再次 close 会 panic。
if c.closed != 0 {
unlock(&c.lock) // 先解锁,再 panic,避免死锁。
panic(plainError("close of closed channel"))
}
// 4. 标记 channel 为已关闭:
c.closed = 1
// 5. 处理等待接收 (recvq) 的 Goroutine:
// - 遍历 recvq 中的所有等待 Goroutine (sg, sudog)。
// - 从 recvq 中移除这些 Goroutine。
// - 如果 sg.elem != nil,表示接收方需要接收数据,但现在 channel 关闭了,没有数据可接收,需要清理接收方的内存。
// - 设置 sg.success = false,通知接收方 channel 已关闭。
// - 将这些 Goroutine 添加到 glist (gList),准备唤醒。
var glist gList // 用于批量唤醒 Goroutine
for {
sg := c.recvq.dequeue() // 从接收队列中取出一个等待的 sudog
if sg == nil {
break // 接收队列为空,结束循环
}
if sg.elem != nil { //如果需要把channel里的数据写到接受方
typedmemclr(c.elemtype, sg.elem) //将接受方的数据清空,避免出现旧数据
sg.elem = nil //设置为空
}
gp := sg.g //获取到对应的g
gp.param = unsafe.Pointer(sg) //设置参数
sg.success = false // 设置success为false,表明接受失败因为channel已经关闭
glist.push(gp) // 添加到等待唤醒的队列里
}
// 6. 处理等待发送 (sendq) 的 Goroutine:
// - 遍历 sendq 中的所有等待 Goroutine。
// - 从 sendq 中移除这些 Goroutine。
// - 设置 sg.success = false,通知发送方 channel 已关闭,发送失败。
// - **注意:尝试向已关闭的 channel 发送数据会 panic,这里只是移除了等待的 Goroutine,panic 会在稍后由发送方 Goroutine 触发。**
// - 将这些 Goroutine 添加到 glist,准备唤醒。
for {
sg := c.sendq.dequeue() //在发送队列取数据
if sg == nil {
break //如果发送队列为空,则退出
}
sg.elem = nil //设置为空
gp := sg.g //拿到对应的g
gp.param = unsafe.Pointer(sg) //设置参数
sg.success = false // 设置success为false
glist.push(gp) //添加到等待唤醒的gp队列
}
// 7. 解锁:
// - 释放 channel 的锁,允许其他 Goroutine 访问。
unlock(&c.lock)
// 8. 批量唤醒 Goroutine:
// - 遍历 glist 中的所有 Goroutine,将它们设置为可运行状态,并放入调度队列。
// - 这些 Goroutine 醒来后,会检查 sg.success 的值,从而得知 channel 是否已关闭,以及操作是否成功。那些尝试发送的goroutine会panic。
for !glist.empty() {
gp := glist.pop() //从链表中取值
gp.schedlink = 0 //gc链表置空
goready(gp, 3) // 设置g为可运行状态, 3 is the value of m._defer.spindone
}
}