hello world

stay foolish, stay hungry

理解Golang channel

channel 是 golang 的核心特性之一,提供了 goroutine 通信的机制,简化了并发模式。

CSP

golang 通过 goroutine 和 channel 部分实现了 CSP(Communicating Sequential Process)。CSP 即通信顺序进程,是 Tony Hoare 于 1977 年提出的一种并发模型。CSP 模型由并发执行的实体(线程或者进程)所组成,实体之间通过发送消息进行通信,这里发送消息时使用的就是通道,或者叫 channel。CSP 模型的关键是关注 channel,而不关注发送消息的实体,golang 中 goroutine 对应 CSP 中并发执行的实体,channel 也就对应着 CSP 中的 channel。

channel基础

channel 底层是 hchan 结构体,源码在 src/runtime/chan.go。

// 程序 1
type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // 队列大小, 值大于0表示有缓冲, 值等于0表示没有缓冲
    buf      unsafe.Pointer // 缓冲队列buffer的指针
    elemsize uint16 // 单个元素大小
    closed   uint32 // 关闭标识符
    elemtype *_type // 元素类型
    sendx    uint   // 发送数据索引
    recvx    uint   // 接收数据索引
    recvq    waitq  // 等待接收数据的sudog(goroutine)链表
    sendq    waitq  // 等待发送数据的sudog(goroutine)链表
    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

sudog 表示在等待队列中的 goroutine。

// 程序 2
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.
    g *g
    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)
    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.
    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

创建channel

通过内置的 make 函数可以创建带缓冲区或不带缓冲区的 channel:

// 程序 3
ch := make(chan Task) // 创建非缓冲channel
ch := make(chan Task, 3) // 创建缓冲channel

make 函数创建并实例化 hchan 结构体,并返回 hchan 的指针,所以我们可以直接将 channel 作为方法的参数。

创建 channel 的函数是 func makechan(t *chantype, size int) *hchan

// 程序 4
func makechan(t *chantype, size int) *hchan {
    elem := t.elem
        // 安全检测
    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }
    var c *hchan
    switch {
    case mem == 0:
                // 如果队列或元素大小是0,则不分配buffer
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
                // 将buf指向自身,不分配缓存空间
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
                // 如果元素不包含指针,则分配一整块内存,用于hchan和buf
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
                // 如果是指针类型,正常创建结构体,buf单独分配空间
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
        // 设置channel的属性
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; 
                    dataqsiz=", size, "\n")
    }
    return c
}

使用channel

发送和接收数据过程

G1 是生产者,G2 是消费者,ch 是容量为 3 的带缓冲 channel,初始的时候 hchan 结构体的 buf 为空,sendxrecvx 都为0。G1 获取锁,并将 task 发送到 channel,并增加 sendx,发送到 channel 中的数据其实是task的副本。然后 G2 获取锁,并从 channel 中获取数据,并增加 recvx,取到的数据依旧是 task 的副本。channel 受互斥锁保护,传递副本可以保证数据的安全。

整个过程没有内存共享(shared memory),充分体现了 CSP 的 do not communicate by sharing memory; instead, share memory by communicating。

blocking/unblocking过程

如果 G2 的读取速度比 G1 的写入速度慢,那么一段时间之后,channe l的 buffer 会被塞满。当 channel 满了之后 G1 继续往 channel 中发送数据。G1 会 block。基于 golang 的调度模型,暂停的其实是 goroutine,而不是 os 线程。

如果需要 block G1,G1 会创建一个 sudog,放到 channel 的 sendq 中,当 channel 的 buffer 有了空间时,G2 会从 sendq 中 pop 出 sudog,取出 elem,将 G1 状态变成 runnable,调度器就可以再次调度 G1 了。

如果 G2 先运行,这时候 channel 中没有数据,G2 从一个空的 channel 中读取数据,G2 也会 block,和 G1 block 的逻辑相似,G2 也会创建 sudog,然后放到 recvq 中。

当此时 G1 向 channel 发送数据,runtime 会直接将 G1 要发送的数据 copy 到 G2 的栈空间,看起来就好像是 G1 直接将数据发送给了 G2,这个过程叫「direct send」,整个过程 G1 和 G2 都不需要再获取锁和读写 buffer。

非缓冲 channel 总是「direct send」的,如果 receiver 先运行,sender 直接将数据写入 receiver 的栈空间,如果 sennder 先运行,receiver 直接从 sudog 接收数据。

向channel发送数据的源码

发送数据对应的方法是 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

  1. 如果向 nil channel 发送数据,会一直block:
// 程序 5
if c == nil {
        if !block {
        return false
    }
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
}

gopark 表示将当前 goroutine 休眠,但是 unlockf 是 nil,所以 goroutine 会一直休眠。如果所有 goroutine 都处于休眠状态,在 runtime 的 checkdead() 会检测异常情况, 抛出 all goroutines are asleep - deadlock!

  1. 如果 recvq 中有等待的 sudog:
// 程序 6
if sg := c.recvq.dequeue(); sg != nil {
    // Found a waiting receiver. We pass the value we want to send
    // directly to the receiver, bypassing the channel buffer (if any).
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

这里直接将数据复制给 receiver,即上文提到的「direct send」。

  1. 如果 buffered channel 并且 buffer 有空间:
// 程序 7
if c.qcount < c.dataqsiz {
    // Space is available in the channel buffer. Enqueue the element to send.
    qp: = chanbuf(c, c.sendx)
    if raceenabled {
        raceacquire(qp)
        racerelease(qp)
    }
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    unlock( & c.lock)
    return true
}

通过 qcountdataqsiz 属性判断 hchan.buf 是否有可用空间,如果 buffer 有空间,则将数据 copy 进 buffer。

  1. 如果 buffer 满了,或者没开启 buffer,则 block。
// 程序 8
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
    mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, 
    traceEvGoBlockSend, 2)

从channel读取数据的源码

读取数据对应的方法是 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

  1. 如果从 nil channel 中接受数据会一直 block。
// 程序 9
if c == nil {
    if !block {
    return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
}

和向 nil channel 发送数据类似,从 nil channel 接收数据也会一直 block。

  1. 从 closed 状态的channel接受数据:
// 程序 10
if c.closed != 0 && c.qcount == 0 {
    if raceenabled {
        raceacquire(c.raceaddr())
    }
    unlock(&c.lock)
    if ep != nil {
        typedmemclr(c.elemtype, ep)
    }
    return true, false
}

如果 buffer 中有数据(if raceenabled),则返回 buffer 中的数据;如果 buffer 中没数据了,则返回默认值,并且第二个返回参数返回 false。

  1. 如果 sendq 中有等待发送的 sudgo
// 程序 11
if sg := c.sendq.dequeue(); sg != nil {
    // Found a waiting sender. If buffer is size 0, receive value
    // directly from sender. Otherwise, receive from head of queue
    // and add sender's value to the tail of the queue (both map to
    // the same buffer slot because the queue is full).
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
}

说明队列已满,如果没开启 buffer,则直接从 sender 读取数据。否则,从队列头读取数据,并把 sender 的数据放到队列尾(由于 buffer 是循环队列, 所以队列尾就是刚才读取数据的位置)。

  1. 如果队列中有数据
// 程序 12
if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
    if raceenabled {
        raceacquire(qp)
        racerelease(qp)
    }
    if ep != nil {
        typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
}

如果队列中有数据,则直接从队列中读取数据。

  1. 如果队列中没数据,也没有 goroutine 向队列中发送数据
// 程序 13
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
    mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, 
    traceEvGoBlockRecv, 2)

如果队列中没数据,也没有 goroutine 向队列中发送数据,goroutine 会 block。

channel是否必须close

channel 可以不关闭,如果 channel 不再被使用,即使不关闭也会被回收。通常 close channel 会作为 channel 不会再有数据的控制信号,如果接收方不关心 channel 中是否还会有数据,那么没必要主动关闭 channel。可以参考 Design Question: Channel Closing。如果要关闭 channel,则最好是由发送方来关闭。

Note that it is only necessary to close a channel if the receiver is looking for a close. Closing the channel is a control signal on the channel indicating that no more data follows.

总结

这篇文章描述了 channel 的基本用法和内部的机制,我们从这篇文章了解到了:

  • channel 的零值是nil,必须初始化才能使用。
  • 发送的是数据的副本,但是发送指针或引用类型不是 goroutine 安全的,receiver 接收到的数据可能被 sender 修改。
  • 往一个已经 closed 的 channel 中发送数据会导致 panic,往 nil channel 发送数据会 block。从一个 nil channel 中接收数据会 block,从一个被 close 的 channel 中接收数据不会 block,如果队列中有数据,则正常读取,否则立即返元素类型的零值。
  • select case 中,如果有多个 case 就绪,那么会随机选择一个 case 执行,select 中的 break 只能跳到 select 这一层,select 中一般配合 label 来使用 break。
  • channel 使用完可以不关闭, 如果要关闭, 最好是发送方来关闭.

参考资料

  1. https://about.sourcegraph.com/go/understanding-channels-kavya-joshi
  2. https://github.com/gophercon/2017-talks/blob/master/KavyaJoshi-UnderstandingChannels/Kavya Joshi - Understanding Channels.pdf