理解Golang channel
channel 是 golang 的核心特性之一,提供了 goroutine 通信的机制,简化了并发模式。
CSP
golang 通过 goroutine 和 channel 部分实现了 CSP(Communicating Sequential Process)。CSP 即通信顺序进程,是 Tony Hoare 于 1977 年提出的一种并发模型。CSP 模型由并发执行的实体(线程或者进程)所组成,实体之间通过发送消息进行通信,这里发送消息时使用的就是通道,或者叫 channel。CSP 模型的关键是关注 channel,而不关注发送消息的实体,golang 中 goroutine 对应 CSP 中并发执行的实体,channel 也就对应着 CSP 中的 channel。
channel基础
channel 底层是 hchan 结构体,源码在 src/runtime/chan.go。
// 程序 1
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // 队列大小, 值大于0表示有缓冲, 值等于0表示没有缓冲
buf unsafe.Pointer // 缓冲队列buffer的指针
elemsize uint16 // 单个元素大小
closed uint32 // 关闭标识符
elemtype *_type // 元素类型
sendx uint // 发送数据索引
recvx uint // 接收数据索引
recvq waitq // 等待接收数据的sudog(goroutine)链表
sendq waitq // 等待发送数据的sudog(goroutine)链表
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
sudog 表示在等待队列中的 goroutine。
// 程序 2
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
创建channel
通过内置的 make 函数可以创建带缓冲区或不带缓冲区的 channel:
// 程序 3
ch := make(chan Task) // 创建非缓冲channel
ch := make(chan Task, 3) // 创建缓冲channel
make 函数创建并实例化 hchan 结构体,并返回 hchan 的指针,所以我们可以直接将 channel 作为方法的参数。
创建 channel 的函数是 func makechan(t *chantype, size int) *hchan
// 程序 4
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 安全检测
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// 如果队列或元素大小是0,则不分配buffer
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 将buf指向自身,不分配缓存空间
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 如果元素不包含指针,则分配一整块内存,用于hchan和buf
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 如果是指针类型,正常创建结构体,buf单独分配空间
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 设置channel的属性
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, ";
dataqsiz=", size, "\n")
}
return c
}
使用channel
发送和接收数据过程
G1 是生产者,G2 是消费者,ch
是容量为 3 的带缓冲 channel,初始的时候 hchan
结构体的 buf 为空,sendx
和 recvx
都为0。G1 获取锁,并将 task 发送到 channel,并增加 sendx,发送到 channel 中的数据其实是task的副本。然后 G2 获取锁,并从 channel 中获取数据,并增加 recvx,取到的数据依旧是 task 的副本。channel 受互斥锁保护,传递副本可以保证数据的安全。
整个过程没有内存共享(shared memory),充分体现了 CSP 的 do not communicate by sharing memory; instead, share memory by communicating。
blocking/unblocking过程
如果 G2 的读取速度比 G1 的写入速度慢,那么一段时间之后,channe l的 buffer 会被塞满。当 channel 满了之后 G1 继续往 channel 中发送数据。G1 会 block。基于 golang 的调度模型,暂停的其实是 goroutine,而不是 os 线程。
如果需要 block G1,G1 会创建一个 sudog,放到 channel 的 sendq
中,当 channel 的 buffer 有了空间时,G2 会从 sendq
中 pop 出 sudog
,取出 elem,将 G1 状态变成 runnable
,调度器就可以再次调度 G1 了。
如果 G2 先运行,这时候 channel 中没有数据,G2 从一个空的 channel 中读取数据,G2 也会 block,和 G1 block 的逻辑相似,G2 也会创建 sudog
,然后放到 recvq
中。
当此时 G1 向 channel 发送数据,runtime 会直接将 G1 要发送的数据 copy 到 G2 的栈空间,看起来就好像是 G1 直接将数据发送给了 G2,这个过程叫「direct send」,整个过程 G1 和 G2 都不需要再获取锁和读写 buffer。
非缓冲 channel 总是「direct send」的,如果 receiver 先运行,sender 直接将数据写入 receiver 的栈空间,如果 sennder 先运行,receiver 直接从 sudog 接收数据。
向channel发送数据的源码
发送数据对应的方法是 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
。
- 如果向 nil channel 发送数据,会一直block:
// 程序 5
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
gopark
表示将当前 goroutine 休眠,但是 unlockf 是 nil,所以 goroutine 会一直休眠。如果所有 goroutine 都处于休眠状态,在 runtime 的 checkdead()
会检测异常情况, 抛出 all goroutines are asleep - deadlock!
- 如果
recvq
中有等待的 sudog:
// 程序 6
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
这里直接将数据复制给 receiver,即上文提到的「direct send」。
- 如果 buffered channel 并且 buffer 有空间:
// 程序 7
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp: = chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock( & c.lock)
return true
}
通过 qcount
和 dataqsiz
属性判断 hchan.buf
是否有可用空间,如果 buffer 有空间,则将数据 copy 进 buffer。
- 如果 buffer 满了,或者没开启 buffer,则 block。
// 程序 8
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend,
traceEvGoBlockSend, 2)
从channel读取数据的源码
读取数据对应的方法是 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
。
- 如果从 nil channel 中接受数据会一直 block。
// 程序 9
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
和向 nil channel 发送数据类似,从 nil channel 接收数据也会一直 block。
- 从 closed 状态的channel接受数据:
// 程序 10
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
如果 buffer 中有数据(if raceenabled
),则返回 buffer 中的数据;如果 buffer 中没数据了,则返回默认值,并且第二个返回参数返回 false。
- 如果
sendq
中有等待发送的sudgo
// 程序 11
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
说明队列已满,如果没开启 buffer,则直接从 sender 读取数据。否则,从队列头读取数据,并把 sender 的数据放到队列尾(由于 buffer 是循环队列, 所以队列尾就是刚才读取数据的位置)。
- 如果队列中有数据
// 程序 12
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
如果队列中有数据,则直接从队列中读取数据。
- 如果队列中没数据,也没有 goroutine 向队列中发送数据
// 程序 13
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,
traceEvGoBlockRecv, 2)
如果队列中没数据,也没有 goroutine 向队列中发送数据,goroutine 会 block。
channel是否必须close
channel 可以不关闭,如果 channel 不再被使用,即使不关闭也会被回收。通常 close channel 会作为 channel 不会再有数据的控制信号,如果接收方不关心 channel 中是否还会有数据,那么没必要主动关闭 channel。可以参考 Design Question: Channel Closing。如果要关闭 channel,则最好是由发送方来关闭。
Note that it is only necessary to close a channel if the receiver is looking for a close. Closing the channel is a control signal on the channel indicating that no more data follows.
总结
这篇文章描述了 channel 的基本用法和内部的机制,我们从这篇文章了解到了:
- channel 的零值是nil,必须初始化才能使用。
- 发送的是数据的副本,但是发送指针或引用类型不是 goroutine 安全的,receiver 接收到的数据可能被 sender 修改。
- 往一个已经 closed 的 channel 中发送数据会导致 panic,往 nil channel 发送数据会 block。从一个 nil channel 中接收数据会 block,从一个被 close 的 channel 中接收数据不会 block,如果队列中有数据,则正常读取,否则立即返元素类型的零值。
select case
中,如果有多个 case 就绪,那么会随机选择一个 case 执行,select 中的 break 只能跳到 select 这一层,select 中一般配合 label 来使用 break。- channel 使用完可以不关闭, 如果要关闭, 最好是发送方来关闭.