Golang 通过 select...case 语句实现了对 channel 的多路复用以及非阻塞收发操作
本文将着重讲解以下问题:
编译器对 select 语句的优化select 如何随机选择 case当多个 channel 可以进行收发操作时,select 又会如何选择 case强烈建议阅读本文前,先看关于 channel 的源码解析 Go 深入源码 —— Channel
如果只关心 select 如何实现 channel 的多路复用,以及如何随机选择 case 的话可以直接看 selectgo 实现 channel 的多路复用
没有任何 case 的 select 语句会被编译器转换为runtime.block()函数,永久阻塞
只有一个 channel 操作,实际会被编译器转换为相应channel 相应的收发操作,其实和实际调用 data := <- ch 并没有什么区别
编译器会将 channel 的收发操作转换成 selectnbsend 或者 selectnbrecv/slectnbrecv2函数来完成非阻塞操作
channel 的非阻塞收发实际都是调用的 chansend 或者 chanrecv
// src/runtime/chan.go // 非阻塞发送 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { // block 参数为 false,非阻塞调用 return chansend(c,elem, false, getcallerpc()) } // 非阻塞接收 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return } func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { selected, *received = chanrecv(c, elem, false) return }而且 channel 会对于非阻塞收发操作有一些优化
// 非阻塞操作, block 参数为 false func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // ... // 快速检测,非阻塞时,有些情况不需要获取锁就可以直接返回 // 非阻塞,未关闭,非缓冲+没有等待接收的 goroutine 或者 缓冲+缓冲区已满 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || ((c.dataqsiz < 0 && c.qcount == c.dataqsiz)) { // 返回 false,表示未发送成功 return false } lock(&c.lock) } func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool){ // ... // 快速检测,在非阻塞模式下,和发送一样有些条件不需要加锁就可以直接判断返回 // 非阻塞并且未关闭,非缓冲+没有待发送者或者有缓冲+缓冲为空 if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } lock(&c.lock) }在加锁前对 channel 进行判断,较少了加锁带来的性能问题 具体的 channel 收发操作,可以参考 Go 深入源码 —— Channel
多 case 的 select 语句实现了 channel 的多路复用,select 会阻塞,直到有 case 操作完成了收发操作
如果有多个 case 操作已经可以执行收发操作了,但是 select 会随机选择一个 case 完成收到操作,然后执行 case 下的相应逻辑
编译器实际会将 select 语句转换成 selectgo 函数,每个 case 操作都会被转换成 scase 结构,然后作为参数传递给 selectgo
在分配 selv 和 order时,使用temp` 方法,应该是为了保证数据会分配到栈中而不是堆中
多 case + default 同样也是调用 selectgo 函数,default 也会封装成 kind 为 caseDefault 的 scase 对象的
selectgo 并不会进行阻塞,如果所有 case 的 channel 都无法立即完成收发操作,那就会直接执行 default 操作
对于 select 语句,编译器会进行相应的转换优化操作,而多 channel 操作便是去调用 selectgo 函数来实现
select 中 case 操作,无论是 channel 收发还是 default 都会被转换为 scase 结构
const ( caseNil = iota // 表示 channel 为 nil 的情况 caseRecv caseSend caseDefault ) type scase struct { c *hchan //进行收发操作的 channel elem unsafe.Pointer // 收发的数据源 kind uint16 // 上述的四种 kind ... }scase 结构很简单,主要记录了 case 操作的类型,channel,以及 channel 收发的数据源
现在我们开始分析 select 的重头戏 selectgo 函数
func selectgo(cas0 *scase, order0 *uint16, ncase int)(int, bool) cas0 指向一个类型为 [ncases]scase 的数组order0 是一个指向[2*ncases]uint16,数组中的值都是 0selectgo 会返回选中的序号,如果是个接收操作,还会返回是否接收到一个值如果看过上文的话就会知道,编译器会将 scases 数组,还有用于排序的数组和 case 数量一起传给 selectgo
为什么 selectgo 还需要传递一个 order0,而不是直接根据 ncase 直接分配呢 编译转换会使用 temp 函数来构造生成数组的语句,而这个语句便可以保证数据会分配到栈上,而不是堆上,避免了不必要的堆分配
而 select 做的第一件事便是将 case0,order0 这些指针转换成相应的 slice 结构
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0)) order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) // [:n:n] 的方式会让slice 的 len 和 cap 相等 scases := cas1[:ncases:ncases] pollorder := order1[:ncases:ncases] lockorder := order1[ncases:][:ncases] // 对于 channel 为 nil 的收发操作,他们的 kind 被修改为 caseNil for i := range scases { case := &scases[i] if cas.c == nil && cas.kind != caseDefault { *cas = scase{} } }order1 会被分为 pollorder 和 lockorder,而这两个slice将会真正决定 select 的随机选择以及死锁问题
selectgo 是通过循环 scases 来挑选可以收发的 channel 然而循环时并不是按照 scases的顺序,而是 pollorder 中记录的顺序, 这样可以避免 channel 的饥饿问题
为了保证 select 随机选择 case,所以使用 fastrandn 来生成随机数
for i := 1; i < ncases; i++ { j := fastrandn(uint32(i+1)) pollorder[i] = pollorder[j] pollorder[j] = uint16(i) }pollorder 在开始的时候值都是 0,循环结束后值便是随机顺序的 scases 索引
selectgo 在查找 scases 中已经可以进行收发操作的 channel 前会先对所有的 channel 进行加锁操作
如果多个 goroutine 都需要锁定 ch1 ch2,而他们加锁的顺序不固定,那么很可能会出现死锁问题 这个时候,对加锁的顺序就有要求了,按照同样的顺序的话,没有竞争到 ch1.lock 的 goroutine,会等待加锁 ch1.lcok,而不会直接去加锁 ch2.lock
加锁前首先会对 lockorder 进行堆排序,生成由 case.c(*hchan) 来排序的 scases 索引顺序
func selectgo(cas0 *scase, order0 *uint16, ncase int)(int, bool) { ... // ... 对 looporder 堆排序 // selectgo 在查找 scases 前,先对所有 channel 加锁 sellock(scases, lockorder) ... }sellock 对地址相同的 channel 只会加锁一次
func sellock(scases []scases, lockorder []int16) { var c *hchan for _, o := range lockorder { c0 := scases[0].c // 根据加锁顺序获取 case // c 记录了上次加锁的 hchan 地址,如果和当前 *hchan 相同,那么就不会再次加锁 if c0 != nil && c0 != c { c = c0 lock(&c.lock) } } }加锁完成后,可以进入 selectgo 主循环逻辑了 主逻辑会分为三部分:
首先根据 pollorder 的顺序查找 scases 是否有可以立即收发的 channelchannel 都没有准备好,并且不存在 default,那么就将当前 goroutine 加入到 channel 相应的等待队列,然后等待收其他 goroutine 唤醒被唤醒后,再次找到满足条件的 channel根据 pollorder 记录的随机 scases 索引来遍历处理 case,然后根据 case.kind 来查看 channel 是否准备好,然后 goto 跳转到相应逻辑
case.kind 为 caseNil,说明 channel 为 nil,那么 continue,不进行任何处理
如果 channel 中有待发送的 goroutine, 跳转到 recv,调用 recv完成接收操作
recv: // src/runtime/chan.go recv(c, sg, cas.elem, func() { selunlock(cases, lockorder) }, 2) recvOK = true goto retc如果 channel 中有缓冲数据,那么跳转到 bufrecv,从缓冲区中获取数据
bufrecv: recvOK = true gp = chanbuf(c, c.recx) if cas.elem != nil { typedmemclr(c.elemtype, gp) } // ... channel 缓冲区调整 selunlock(scases, lockorder) goto retc如果 channel 已关闭,跳转到 rclose, 将接收值置为空值,recvOK 置为 false
rclose: selunlock(scases, lockorder) if cas.elem != nil { typedmemclr(c.elemtype, cas.elem) } goto retc接收操作与正常接收操作类似,可以参考 channel 接收数据
对于发送操作会先判断 channel 是否已经关闭,跳转到 sclose,直接 panic
sclose: selunlock(scases, lockorder) panic(plainError("send on closed channel"))如果 channel 为关闭,并且有待接收队列不为空,说明 channel 的缓冲区为空,跳转到 send , 调用 send 函数,直接发送数据给待接收者
send: send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) goto retc如果缓冲区不为空的话,跳转到 bufsend,从缓冲区获取数据
bufsend: typedmemove(c.elemtype, chanbuf(c, c.sendx), cas.elem) // ... 调整缓冲区索引 selunlock(scases, lockorder) goto retcdfli 和 dfl 记录了 kind 为 caseDefault 的 case
如果所有 channel 都没有完成收发操作,那么就不会执行 goto 跳转,循环结束后判断 dfl != nil 存在 default,就直接返回 default 的索引 dfli
如果没有 channel 可以执行收发操作,并且没有 default case,那么就将当前 goroutine 加入到 channel 相应的收发队列中,等待被其他 goroutine 唤醒
func selectgo(cas0 *scase, order0 *uint16, ncase int)(int, bool) { // ... setlock(scases, lockorder) // ... 查看是否有准备好的 channel,或者存在 default case gp = getg() for _, casi := range lockorder { casi = int(casei) cas = &scases[casi] if cas.kind == caseNil { continue // channel 为 nil 直接跳过 } c = cas.c // 构造 sudog sg := acquireSudog() sg.g = gp sg.isSelect = true sg.elem = cas.elem sg.c = c // 加入相应等待队列 switch cas.kind { case caseRecv: c.recvq.enqueue(sg) case caseSend: c.sendq.enqueue(sg) } } // 被唤醒后会根据 param 来判断是否是由 close 操作唤醒的,所以先置为 nil gp.param = nil // selparkcommit 会解锁所有 channel gopark(selparkcommit, nil,waitReasonSelect, traceEvGoBlockSelect, 1)将当前 goroutine 加入到每一个 channel 等待队列中 如果所有的 channel 都是 nil 的话就会被永久阻塞,不会被唤醒
selectgo 构建出来的 sudog 会将 isSelect 置为 true,这样时为了避免多个 channel 从等待队列中获取相同 goroutine 封装的 sudog
现在来看一下 等待队列的出队方法 dequeue
struct hchan { ... sendq *waitq recvq *waitq } func (q *waitq) dequeue() *sudog { for { sgp := q.first if sgp == nil { return nil } // ... // 原子操作,如果 spg.g.selectDone 不为 1,则修改为 0 // 为 1 说明 spg.g 已经被其他 channel 取出,直接跳过 sgp if sgp.isSelect && !atomic.Cas(spg.g.selectDone, 0, 1) { continue } } return sgp }当前goroutine 被唤醒后,将其他 sudog 从相应的 channel 等待队列中移除
selectgo 会根据变量 cas 的值来判断是收发操作唤醒还是关闭操作唤醒 关闭操作唤醒的话 gp.param 会被置为 nil,那么就不会赋值 cas 变量
func selectgo(cas0 *scase, order0 *uint16, ncase int)(int, bool) { // ... loop: // ... 查找 scases 中是否有准备完成的 channel gopark(selparkcommit, nil,waitReasonSelect, traceEvGoBlockSelect, 1) // 加锁所有的 channel sellock(scases, lockorder) // ... if cas == nil { // 由关闭操作唤醒 goroutine,那么再次回到 loop 处 goto loop } c = cas.c if cas.kind == caseRecv { recvOK = true } selunlock(scases, lockorder) return casi, recvOK }对于关闭操作唤醒,逻辑会回到 loop 中再次执行 scases 的检查操作
关闭操作唤醒 selectgo 后,在完成所有 channel 加锁前又有 channel 准备好收发操作了,那么在 loop 查询时,按照 pollorder 随机顺序,可能会选中刚刚准备好的 channel,而不是唤醒 selectgo 的 case
而对于收发操作,已经完成了值的拷贝,必然会选择这个 case,而不会再次去查询
使用 单 channel,单 channel + default 时编译器会对 select 进行编译转换,这些转换会带来性能的优化
多 channel:
首先会对所有的所有的 channel 进行加锁,而加锁过程会按照 channel 进行排序,避免了死锁的出现,并且不会对 channel 重复加锁
加锁完成后,会使用随机数来打乱查询的顺序,随机选择 case 避免 channel 饥饿,保证公平性
如果所有 channel 都没有准备好,并且有 default case,那么就选择 default case
如果没有 default,那么 select 就会阻塞,然后等待被其他 goroutine 唤醒 select
关闭操作唤醒 select 时,最终选择的并不一定是该 channel case,因为会再次使用随机顺序来选择合适的 channel
收发操作唤醒 select 时,必然会选择该 channel case
Go 语言设计与实现 - select