导语:在go语言中,chan 和 goroutine 是其并发模型CSP最重要体现,本文将基于1.14版本,深入源码,尽可能详细分析其内部实现原理。
在并发线程中通信一般来说有两种模型:共享内存和消息传递。 常见的共享内存方式涉及到数据竞争这些问题,引入到锁、原子操作来解决。而基于消息传递的方式保证了不会产生数据竞争状态。 其中,实现消息传递有两种常见的类型:基于channel的消息传递和基于actor的消息传递。 而golang,就是基于channel的代表语言。erlang则是基于actor的代表语言。 在CSP(communicating sequential process)中,它将channel列为第一类对象,它不关注发送消息的实体,而是关注发送消息时使用的channel。golang则是基于这篇论文中的部分理论诞生的,也就是理论中的Process/channel:process和channel没有从属关系,process可以消费任意个channel,而channel也不关心具体是哪个process在使用它进行通信;process之间依据channel进行消息传递,形成一套有序阻塞和可预测的并发模型。对应到golang中,process就是goroutine,channel就是chan。 备注:CSP理论模型电子版链接:http://www.usingcsp.com/cspbook.pdf ,作者Tony Hoare
敲黑板:chan的实质是一个队列 如果你创建的是一个带缓冲的chan,chan就是一个循环队列,如果不带缓冲就是一个普通的队列。 在src/runtime/chan.go中,定义了一个结构体:hchan,还实现了一些方法:makechan、chansend、chanrecv、closechan,我们所使用的chan的最主要的功能,包括chan的创建,向chan写数据,读chan中的数据,关闭chan,就是围绕这个结构体和这几个方法实现的。我们接下来的内容,也主要围绕它们展开。
源码如下:
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }qcount:buf数组中已经放入的元素个数 dataqsize:buf数组长度,创建时调用make指定 buf:buf 数组 elemsize:buf数组中每个元素的大小 closed:chan是否关闭, 0代表没有关闭 elemtype:chan中元素的类型 sendx:buf数组中以发送的索引位置,用以构造循环队列 recvx:buf数组中已接收的索引位置,用以构造循环队列 recvq:等待接收的goroutine,当chan中buf无数据并且无sendq时但有goroutine等待消费时会产生,实质是包含goroutine及有关信息的sudog,多个recvq会形成链表,依然是FIFO的标准队列 sendq:等待发送的goroutine,当chan中buf数据写满时但仍然有goroutine等待写入时会产生,实质是包含goroutine及有关信息的sudog,多个sendq会形成链表,依然是FIFO的标准队列。 lock:锁,用以保证chan中数据的顺序通信
源码如下:
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. 校验数据类型大小,大于1<<16(65536)异常 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } //内存对齐(多平台兼容,降低维度提高速度,减少内存消耗),大于最大内次8字节时异常 if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } //判断所需空间是否大于堆可分配的最大内存 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { //size为0,分配hchan结构体空间 case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() //不包括指针,分配连续地址空间,包括hchan结构体+数据,将申请下来的地址首地址赋值给buf,便于GC回收,减小gc压力 case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) //包括指针,buf单独分配空间 default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }注意:makechan 返回的是hchan指针,这也就是为什么chan是golang中的引用类型,传递的是指针而非值
源码如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { //检测chan是否为空,为空报错,所以往一个nil的chan中写数据,程序会异常退出报错 if c == nil { //如果是非阻塞的,返回false,不会触发 if !block { return false } //如果是阻塞的goroutine停止 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } //开启竞争检测 if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel). // Because a closed channel cannot transition from 'ready for sending' to // 'not ready for sending', even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed // and not ready for sending. We behave as if we observed the channel at that moment, // and report that the send cannot proceed. // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the // channel wasn't closed during the first observation. //如果size = 0 或者 缓冲满了,返回false,不会触发block传入时值为true if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } //chan加锁 lock(&c.lock) //往关闭了的chan写数据,直接panic if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } //看接收者是否为空,如果为空,说明buf一定为空,直接取接受者队列队首sudog,把数据发给它并且释放锁。 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } //如果buf还有空位,将数据写入buf数组中 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. //获取当前goroutine gp := getg() //创建sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. //sudog赋值 mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil //将sudog加入sendq链表中 c.sendq.enqueue(mysg) //将当前goroutine陷入沉睡 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) //再次唤醒,说明数据已经发送出去了,写入buf,或者被接收者消费 // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true }send函数
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { //不会触发,默认为false if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. qp := chanbuf(c, c.recvx) raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } //数据没问题,直接调用sendDirect,将数据拷贝到目标内存地址 if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } //获取该goroutine gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } //将该goroutine放入到p的runnext中,等待下次直接调度 goready(gp, skip+1) }源码如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect. if debugChan { print("chanrecv: chan=", c, "\n") } //如果从nil的chan中读数据,报错,程序退出 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not ready for receiving, we observe that the // channel is not closed. Each of these observations is a single word-sized read // (first c.sendq.first or c.qcount, and second c.closed). // Because a channel cannot be reopened, the later observation of the channel // being not closed implies that it was also not closed at the moment of the // first observation. We behave as if we observed the channel at that moment // and report that the receive cannot proceed. // // The order of operations is important here: reversing the operations can lead to // incorrect behavior when racing with a close. //不会触发 if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) //向已经关闭的chan读数据,如果buf为空,返回false不会报错和panic,如果buf不为空,仍然能读取到数据 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } //如果发送者队列不为空,存在两种情况,第一种是不带buf,直接赋值,第二种是buf已满,这个时候需要取出buf中recvx位置数据,交给当前goroutine消费,并且把发送者队列队首数据写入buf中 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } //buf中有数据,从buf中拿数据 if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { //不带buf情况 if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { //buf已满情况 // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) } // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }源码如下:
func closechan(c *hchan) { //关闭一个nil的chan,直接panic if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) //关闭一个已经关闭了的chan,直接panic if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } //将closed置为非零 c.closed = 1 var glist gList //清理所有的数据,包括recvq,sendq // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }