Channel

https://mp.weixin.qq.com/s/QgNndPgN1kqxWh-ijSofkw

1. 简介

Channel 是 Go 语言中一种特殊的类型,它是一个并发安全的队列,用于在 goroutine 之间传递数据。

2. 数据结构

Channel 的数据结构如下:

2.1 hchan

type hchan struct {
    // 循环队列
    qcount   uint           // 通道中数据个数
    dataqsiz uint           // buf长度
    buf      unsafe.Pointer // 数组指针
    sendx    uint   // send index
    recvx    uint   // receive index
    elemsize uint16 // 元素大小
    elemtype *_type // 元素类型
    
    closed   uint32 // 通道关闭标志
    
    recvq    waitq  // 由双向链表实现的recv waiters队列
    sendq    waitq  // 由双向链表实现的send waiters队列
    lock mutex
}

qcount: 通道中数据的个数
dataqsiz: 数组长度
buf: 指向数组的指针,数组中存储往channel发送的数据
sendx: 往数组中发送元素的index
recvx: 从数组中接收元素的index
elemsize: 数组中存储的元素大小
elemtype: 数组中存储的元素类型
closed: 通道关闭的标志
recvq: 因为读取channel而陷入阻塞的协程等待队列
sendq: 因为写入channel而陷入阻塞的协程等待队列。
lock: 用于保护hchan的互斥锁。

2.2 waitq

// 等待队列 (双向链表)
type waitq struct {
    first *sudog
    last *sudog
}

waitq是因为读写channel而陷入阻塞的协程等待队列

  1. first 队列头部
  2. last 队列尾部

2.3 sudog

type sudog struct {
    g *g // 等待send或recv的协程g
    next *sudog // 等待队列下一个结点next
    prev *sudog // 等待队列前一个结点prev
    elem unsafe.Pointer // data element (may point to stack)
    
    success bool // 标记协程g被唤醒是因为数据传递(true)还是channel被关闭(false)
    c        *hchan // channel

}

sudog 是 “synchronization user data object” 的缩写,它代表一个正在等待的 goroutine。

sudog是协程等待队列的节点:

g 因读写而陷入阻塞的协程

next 等待队列下一个节点

prev 等待队列前一个节点

elem 对于写channel,表示需要发送到channel的数据指针;对于读channel,表示需要被赋值的数据指针。

success 标记协程被唤醒是因为数据传递(true)还是channel被关闭(false)

c 指向channel的指针

3. 操作

3.1 创建channel

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    // buf数组所需分配内存大小
    mem := elem.size*uintptr(size)
    var c *hchan
    switch {
    case mem == 0:// Unbuffered channels,buf无需内存分配
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0: // Buffered channels,通道元素类型非指针
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Buffered channels,通道元素类型是指针
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    return c
}

makechan 函数是 Go 语言中创建 channel 的核心函数。 它根据 channel 的类型和缓冲区大小,分配内存,初始化 hchan 结构体的各个字段,并返回指向该结构体的指针。

3.2 通道写入

通道关闭情况和通道为nil的情况

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 1.channel为nil
    if c == nil {
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    
    lock(&c.lock) //加锁
    
    // 2.如果channel已关闭,直接panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
   
    // Block on the channel. 
    mysg := acquireSudog()
    c.sendq.enqueue(mysg) // 入sendq等待队列
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    
    
    closed := !mysg.success // 协程被唤醒的原因是因为数据传递还是通道被关闭
    // 3.因channel被关闭导致阻塞写协程被唤醒并panic
    if closed {
        panic(plainError("send on closed channel"))
    }
}

  • 如果channel为nil,直接panic。
  • 如果channel不为nil,且channel已经关闭,往已经关闭的channel发送数据,会导致panic。
  • 对因写入而陷入阻塞的协程,如果channel被关闭,阻塞协程会被唤醒并panic.

3.2.1 无缓冲channel

case1:写时存在阻塞读协程

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...

    lock(&c.lock)

    // ...

    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    
    // ...
  • 加锁;
  • 从阻塞度协程队列中取出一个 goroutine 的封装对象 sudog;
  • 在 send 方法中,会基于 memmove 方法,直接将元素拷贝交给 sudog 对应的 goroutine;
  • 在 send 方法中会完成解锁动作.

3.2.2 写时无阻塞读协程但环形缓冲区仍有空间

case2 : 写时无阻塞读协程但环形缓冲区仍有空间

这个时候,会将元素拷贝到环形缓冲区中,然后解锁。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)
    // ...
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // ...
}
  • 加锁;
  • 将当前元素添加到环形缓冲区 sendx 对应的位置;
  • sendx++;
  • qcount++;
  • 解锁,返回.

3.2.3 写时无阻塞读协程且环形缓冲区无空间


case3: 写时无阻塞读协程且环形缓冲区无空间

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...
    lock(&c.lock)

    // ...
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.g = gp
    mysg.c = c
    gp.waiting = mysg
    c.sendq.enqueue(mysg)
    
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    
    gp.waiting = nil
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

• 加锁;
• 构造封装当前 goroutine 的 sudog 对象;
• 完成指针指向,建立 sudog、goroutine、channel 之间的指向关系;
• 把 sudog 添加到当前 channel 的阻塞写协程队列中;
• park 当前协程;
• 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走);
• 解锁,返回

3.3 读取

3.3.1 读取总流程

3.3.2 读时有阻塞的写协程

3.3.3 读时无阻塞写协程且缓冲区有元素

3.3.4 读时无阻塞写协程且缓冲区无元素

4. 阻塞与非阻塞模式

在上述源码分析流程中,均是以阻塞模式为主线进行讲述,忽略非阻塞模式的有关处理逻辑. 此处阐明两个问题:

• 非阻塞模式下,流程逻辑有何区别?

• 何时会进入非阻塞模式?

关闭

  • 关闭未初始化过的 channel 会 panic;

  • 加锁;

  • 重复关闭 channel 会 panic;

  • 将阻塞读协程队列中的协程节点统一添加到 glist;

  • 将阻塞写协程队列中的协程节点统一添加到 glist;

  • 唤醒 glist 当中的所有协程.

  • 向已经 close 的 channel 发送数据会 panic (不是在这个函数里 panic,而是在发送方 Goroutine 醒来后 panic)。

在 closechan 函数中,glist 的主要作用是 收集所有需要被唤醒的 Goroutine,然后进行批量唤醒。
唤醒在 closechan 中收集的 Goroutine 的目的是为了让这些 Goroutine 知道 Channel 已经关闭,并采取相应的行动。

func closechan(c *hchan) {
    // 1. 前置条件检查:
    //   - 如果 channel 为 nil,panic。
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 2. 加锁:
    //   - 防止并发的 close,以及与其他 channel 操作的竞争。
    lock(&c.lock)

    // 3. 检查 channel 是否已经关闭:
    //   - 如果 channel 已经关闭,再次 close 会 panic。
    if c.closed != 0 {
        unlock(&c.lock) // 先解锁,再 panic,避免死锁。
        panic(plainError("close of closed channel"))
    }

    // 4. 标记 channel 为已关闭:
    c.closed = 1

    // 5. 处理等待接收 (recvq) 的 Goroutine:
    //   - 遍历 recvq 中的所有等待 Goroutine (sg, sudog)。
    //   - 从 recvq 中移除这些 Goroutine。
    //   - 如果 sg.elem != nil,表示接收方需要接收数据,但现在 channel 关闭了,没有数据可接收,需要清理接收方的内存。
    //   - 设置 sg.success = false,通知接收方 channel 已关闭。
    //   - 将这些 Goroutine 添加到 glist (gList),准备唤醒。
    var glist gList  // 用于批量唤醒 Goroutine
    for {
        sg := c.recvq.dequeue()  // 从接收队列中取出一个等待的 sudog
        if sg == nil {
            break  // 接收队列为空,结束循环
        }
        if sg.elem != nil { //如果需要把channel里的数据写到接受方
            typedmemclr(c.elemtype, sg.elem) //将接受方的数据清空,避免出现旧数据
            sg.elem = nil //设置为空
        }
        gp := sg.g //获取到对应的g
        gp.param = unsafe.Pointer(sg) //设置参数
        sg.success = false // 设置success为false,表明接受失败因为channel已经关闭
        glist.push(gp) // 添加到等待唤醒的队列里
    }

    // 6. 处理等待发送 (sendq) 的 Goroutine:
    //   - 遍历 sendq 中的所有等待 Goroutine。
    //   - 从 sendq 中移除这些 Goroutine。
    //   - 设置 sg.success = false,通知发送方 channel 已关闭,发送失败。
    //   - **注意:尝试向已关闭的 channel 发送数据会 panic,这里只是移除了等待的 Goroutine,panic 会在稍后由发送方 Goroutine 触发。**
    //   - 将这些 Goroutine 添加到 glist,准备唤醒。
    for {
        sg := c.sendq.dequeue() //在发送队列取数据
        if sg == nil {
            break //如果发送队列为空,则退出
        }
        sg.elem = nil //设置为空
        gp := sg.g //拿到对应的g
        gp.param = unsafe.Pointer(sg) //设置参数
        sg.success = false // 设置success为false
        glist.push(gp) //添加到等待唤醒的gp队列
    }

    // 7. 解锁:
    //   - 释放 channel 的锁,允许其他 Goroutine 访问。
    unlock(&c.lock)

    // 8. 批量唤醒 Goroutine:
    //   - 遍历 glist 中的所有 Goroutine,将它们设置为可运行状态,并放入调度队列。
    //   - 这些 Goroutine 醒来后,会检查 sg.success 的值,从而得知 channel 是否已关闭,以及操作是否成功。那些尝试发送的goroutine会panic。
    for !glist.empty() {
        gp := glist.pop() //从链表中取值
        gp.schedlink = 0 //gc链表置空
        goready(gp, 3)   // 设置g为可运行状态, 3 is the value of m._defer.spindone
    }
}