// sudog represents a g in a wait list, such as for sending/receiving // on a channel. // // sudog is necessary because the g ↔ synchronization object relation // is many-to-many. A g can be on many wait lists, so there may be // many sudogs for one g; and many gs may be waiting on the same // synchronization object, so there may be many sudogs for one object. // // sudogs are allocated from a special pool. Use acquireSudog and // releaseSudog to allocate and free them. type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops.
g *g
next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock.
acquiretime int64 releasetime int64 ticket uint32
// isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool
// success indicates whether communication over channel c // succeeded. It is true if the goroutine was awoken because a // value was delivered over channel c, and false if awoken // because c was closed. success bool
parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
makechan
创建一个channel主要是通过 make 内建函数来完成。如果没有传入参数,即创建一个不带缓冲的channel,则默认 size=0。
// 否则队列已满 或者 创建的不带缓冲的channel,则阻塞当前G gp := getg() // 获取一个sudog对象并设置其字段 mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 此时 sudog.elem 表示发送者对象,即 ch <- x 中的x对象 mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil
// 将sudog加入到channel的发送等待队列 c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 当前 Goroutine 切换为等待状态并阻塞等待其他的Goroutine从 channel 接收数据 // 并将其唤醒 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep)
if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil mysg.c = nil // 归还sudog对象 releaseSudog(mysg) // 当前goroutine被唤醒后,如果发现channel已经被关闭了,则panic // 也就是说不能向一个已经关闭的channel发送数据 if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } returntrue }
funcrecv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skipint) { // 不带缓冲的channel if c.dataqsiz == 0 { if ep != nil { // copy data from sender // 将sendq队列中的数据sg.elem复制到接收者ep recvDirect(c.elemtype, sg, ep) } } else { // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. // 从数据缓冲区队列中取出一个元素 qp := chanbuf(c, c.recvx) // copy data from queue to receiver if ep != nil { // 并将其复制到接收者中 typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue // 将数据取出后,会腾出一个位置,此时将从sendq队列中的取出的数据sg.elem复制到该位置gp typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
funcclosechan(c *hchan) { // 关闭一个nil的chan,panic if c == nil { panic(plainError("close of nil channel")) }
lock(&c.lock)
// 关闭一个已经closed的chan,panic if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
// 标记为已关闭 c.closed = 1
// 保存channel中所有等待队列的G var glist gList
// release all readers for { // 接收等待队列 sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) }
// release all writers (they will panic) for { // 发送等待队列 sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 // 唤醒G,将G切换为runnnable状态并加入到处理器P的本地运行队列,等待调度器调度执行 goready(gp, 3) } }