Golang源码分析 - channel通道

Channel

本文是Go源码分析系列的第四篇,接着记录本人深入学习Go的过程。不得不说Go的源码还是值得一读的,当然最好带着目的去阅读源码,可跳过一些不重要的片段,这样才能高效到达自己的目的。阅读的过程中难免遇到不懂的,没事,善于利用 搜索引擎,但切记 不可盲目 复制。

整体下来阅读Go源码给我最直观的感受有几点 :

  • 指针无处不在!不过好在我是C++转Go的,这倒是没什么大问题。可能后面会写一篇关于 unsafe.Pointer 的介绍
  • Go本身语法简洁且代码格式化,阅读起来不是那么令人不适。
  • Go采用包机制,通过包名访问包内函数,但是这存在一个问题:有时不知道该函数到底在哪一个文件。
  • 可以使用 VScode+delve 断点调试Go源码,在分析GMP调度模型时简直不要太好用。
  • Go源码中注释还挺多的…
  • Go源码中很多函数只有函数声明,没有函数体,一般是通过汇编代码完成的。

言归正传,在Go语言中要想要实现不同goroutine之间进行通信,一般采取 channel(即通道) 实现,通过channel可实现不同goroutine之间共享数据。除此之外channel本身基于 阻塞队列(或者说 有锁循环队列),在某些情况下可以替换掉 sync 包下的条件变量和互斥锁的组合使用。

比如要实现一个Goroutine池,传统方法是互斥锁+条件变量,但是在Go中还有另外一种方法就是通过 channel 实现。我之前学完Go语法后就开始尝试用channel实现过一个简单的Goroutine池,具体Github: https://github.com/josexy/gopool ,当然channel用途不止这个。由于channel是一个天然的阻塞队列,因此还可以实现生产者-消费者、消息传递、并发控制等。

下面是一个非常简单的例子

1
2
3
4
5
6
7
8
func main() {
// 不带缓冲的channel
ch := make(chan int)
go func() {
ch <- 1000
}()
fmt.Println(<-ch)
}

但是如果使用不当,即下面的这个例子,编译器会检测出存在死锁: fatal error: all goroutines are asleep - deadlock!,这很好理解,由于没有数据发送到给channel,主线程将会永久阻塞。

但是编译器并不是所有的情况都会检测出死锁,因此在使用channel时需要编写者规范代码。

1
2
3
4
5
6
7
8
9
func main() {
// 不带缓冲的channel
ch := make(chan int)
go func() {
// 注释
// ch <- 1000
}()
fmt.Println(<-ch)
}

Go语言提供了一种称为 通信顺序进程 CSP(Communicating Sequential Processes) 的模型,它是通过 通信的方式实现共享内存,而 不应该 通过共享内存的方式实现通信。需要注意的是,channel内部采用 有锁的循环队列 来保存数据。

一般来说Go有两种类型的channel:

  • 带缓冲的channel :make(chan int, 2),需要传入一个容量大小
  • 不带缓冲的channel : make(chan int),默认容量大小为0

使用channel:

  • <-chan
  • chan<-
  • 关闭 close(chan)
  • channel长度 len(ch)
  • channel容量 cap(ch)

对于channel的使用可用下面的表格来表示读写关闭的情况

操作 nil的channel 正常channel 已关闭channel
<- ch 阻塞 成功或阻塞 读到零值
ch <- 阻塞 成功或阻塞 panic
close(ch) panic 成功 panic

区别于其他的编程语言,Go在语法上直接支持channel的发送和接受数据,使得操控channel变得更为简单,实际上Go编译器会在编译期间解析 <--> 并调用不同的函数: chanrecvchansend

channel的具体实现位于 runtime/chan.gohchan 是channel的内部实现,本质是一个 缓冲区数组实现的环形队列

前面说过,channel用于 不同Goroutine之间读写数据,因此就需要channel能够保存当前正在等待读和写的所有Goroutine,即:

  • hchanbuf 保存数据

  • hchanrecvqseneq 等待的Goroutine读写 buf 中的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// channel结构信息
type hchan struct {
// 队列中元素数量
qcount uint
// 队列最大容量,即可以存在多少个元素
dataqsiz uint
// 队列起始指针
buf unsafe.Pointer
// 元素的大小
elemsize uint16
// 当前channel是否已经关闭,1:关闭,0:未关闭
closed uint32
elemtype *_type
// 队列中已发送位置索引
sendx uint // send index
// 队列中已接收位置索引
recvx uint // receive index

// 等待读channel的Goroutine队列
recvq waitq
// 等待写channel的Goroutine队列
sendq waitq

// 互斥锁,保证并发正确性
lock mutex
}

// 存放正在等待的Goroutine队列
type waitq struct {
first *sudog
last *sudog
}

image-20220410194541496

sudog 表示位于等待发送和接收队列里面的Goroutine,一般用于channel。而这个 sudog 是从当前正在运行的G所绑定的处理器P中的本地 sudog池获取,如果本地 sudog 池为空,则从全局调度器 sched 中转移一半的 sudog 到处理器P的本地 sudog 池中(这是为了减少锁竞争,因为访问全局调度器需要锁)。这里涉及到Go的GMP调度模型,可能有的朋友已经知道或者不知道GMP是什么,不过我后面可能会写一篇文章介绍Go中GMP模型(目前笔记快写好了,只是还没有整理完成)。

简单来说可以将sudog看作是一个位于等待队列里面的Goroutine。

获取一个sudog的源码位置: https://github.com/golang/go/blob/4a56ba1c453927256f231a8bcef316bb4b3aa68a/src/runtime/proc.go#L377。

另外下面sudog的注释可谓是非常的良心。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.

g *g

next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

acquiretime int64
releasetime int64
ticket uint32

// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool

// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool

parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

makechan

创建一个channel主要是通过 make 内建函数来完成。如果没有传入参数,即创建一个不带缓冲的channel,则默认 size=0

由于channel可以保存任意类型的元素,因此需要根据不同类型分配不同大小的内存空间,即:

  • size=0,表示创建不带缓冲的channel,则只分配 hchan 结构体大小 hchanSize 需要的内存空间即可,不需要为缓冲区队列分配内存空间
  • 如果元素不是指针类型,则分配的大小为:hchan本身大小hchanSize+每个元素大小*队列容量
  • 如果元素是指针类型,则为 hchan 单独开辟内存空间,缓冲区队列大小为每个元素大小*队列容量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
debugChan = false
)

func makechan(t *chantype, size int) *hchan {
elem := t.elem

// 存储元素需要的内存大小 = channel中元素大小 * 缓冲区容量大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

var c *hchan
switch {
case mem == 0:
// 缓冲区大小为0,说明这是一个不带缓冲区的channel
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不是指针类型
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 缓冲区队列起始地址
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素是指针类型
c = new(hchan)
// 缓冲区队列起始地址
c.buf = mallocgc(mem, elem, true)
}

// channel中元素的大小
c.elemsize = uint16(elem.size)
// channel中元素的类型信息
c.elemtype = elem
// 缓冲区队列容量大小
c.dataqsiz = uint(size)

lockInit(&c.lock, lockRankHchan)

return c
}

func (c *hchan) raceaddr() unsafe.Pointer {
return unsafe.Pointer(&c.buf)
}

chansend

向channel发送数据通过 ch <- X 完成,实际上编译器会调用 chansend ,其中 block 参数表示该发送数据操作是否是阻塞的。

假设发送数据是阻塞的,则函数chansend()的工作流程大致为:

  • 向一个nil的channel发送数据将会阻塞当前的G。
  • 向一个已经 关闭 的channel发送数据panic。
  • 如果当前channel的 接收等待队列 recvq 中有 等待的sudog,则取出一个sudog并将数据直接发送过去,不必将数据缓存到channel的缓冲区队列中;否则将数据暂存到缓存队列里。
  • 如果channel的数据缓冲区队列 buf 没有满,说明可以将数据缓存到该队列中,通过 chanbuf() 计算队列下一个可以存储数据的地址。
  • 如果buf已满或者c.qcount=c.dataqsiz(即不带缓冲的channel),将 当前G数据对象 封装到sudog并加入到channel的 发送等待队列 recvq,最后挂起当前的Goroutine,直到唤醒。
  • 当G被唤醒后,如果channel已经关闭,则panic。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
func chansend1(c *hchan, elem unsafe.Pointer) {
// block=true,说明发送操作是阻塞的
chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
// 向一个nil的channel发送数据时将阻塞当前G
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

lock(&c.lock)

// 不能向一个已经closed的channel发送数据
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// 如果在接收等待队列上存在正在等待的G,则直接将数据发送
// 不必将数据缓存到队列中
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 否则说明没有正在等待接收的G,因此需要将数据缓存到队列中
// 如果缓冲区队列未满
if c.qcount < c.dataqsiz {
// 将数据保存到缓冲区队列
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
// 循环队列
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 队列元素数量+1
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}

// 否则队列已满 或者 创建的不带缓冲的channel,则阻塞当前G
gp := getg()
// 获取一个sudog对象并设置其字段
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 此时 sudog.elem 表示发送者对象,即 ch <- x 中的x对象
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil

// 将sudog加入到channel的发送等待队列
c.sendq.enqueue(mysg)

atomic.Store8(&gp.parkingOnChan, 1)

// 当前 Goroutine 切换为等待状态并阻塞等待其他的Goroutine从 channel 接收数据
// 并将其唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)

if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
// 归还sudog对象
releaseSudog(mysg)
// 当前goroutine被唤醒后,如果发现channel已经被关闭了,则panic
// 也就是说不能向一个已经关闭的channel发送数据
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
1
2
3
4
// 计算缓冲区下一个可以存储数据的位置
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 是否有数据要发送
if sg.elem != nil {
// 将要发送的数据ep复制到接收者sg中
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将G重新放入处理器P的本地运行队列,等待被调度
goready(gp, skip+1)
}
1
2
3
4
5
6
7
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// sudog.elem 保存发送和接收的数据
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// 复制
memmove(dst, src, t.size)
}

chanrecv

使用 <- 可从channel中读取数据并保存到接收对象,chanrecv() 函数大致流程如下:

  • 读取nil的channel,将会阻塞当前的Goroutine
  • 读取已关闭的channel,返回 (零值,false)
  • 如果当前channel的 发送等待队列 sendq 中有 等待的sudog,则取出一个 sudog ,并将该发送者sudog中的数据复制到接收对象中
  • 如果当前channel数据缓冲区队列有数据,则取出该元素并将数据复制到接收者中
  • 否则channel缓冲区队列为空,即无数据可读,因此将 当前G保存接收数据的对象 封装到 sudog 中并加入到 接收等待队列 receq
1
2
3
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 从一个nil的channel接收数据将会永久阻塞
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

lock(&c.lock)

// channel被关闭且不存在数据时,直接返回
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

// 从channel的发送等待队列中取出一个sudog
// 并将该发送者sudog中的数据复制到接收者
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

// 队列有数据
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
// 将该数据复制到接收对象
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 队列元素数量-1
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// 缓冲区队列没有数据可以读取,则将当前G加入到接收等待队列
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

// 此时 sudog.elem 表示接收者对象,比如 x <- ch 中的x对象
// 用于保存读取的数据
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil

// 加入到接收等待队列
c.recvq.enqueue(mysg)

atomic.Store8(&gp.parkingOnChan, 1)
// 阻塞等待被唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}

recv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 不带缓冲的channel
if c.dataqsiz == 0 {
if ep != nil {
// copy data from sender
// 将sendq队列中的数据sg.elem复制到接收者ep
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// 从数据缓冲区队列中取出一个元素
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
// 并将其复制到接收者中
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
// 将数据取出后,会腾出一个位置,此时将从sendq队列中的取出的数据sg.elem复制到该位置gp
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}

closechan

close(ch) 可以关闭一个通道,closechan()函数大致流程如下:

  • 关闭nil/已closed的channel,直接panic。
  • 获取当前channel等待队列里的所有G,最后将所有的G切换为runnnable状态并加入到处理器P的本地运行队列,之后等待调度器调度执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func closechan(c *hchan) {
// 关闭一个nil的chan,panic
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)

// 关闭一个已经closed的chan,panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

// 标记为已关闭
c.closed = 1

// 保存channel中所有等待队列的G
var glist gList

// release all readers
for {
// 接收等待队列
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}

// release all writers (they will panic)
for {
// 发送等待队列
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// 唤醒G,将G切换为runnnable状态并加入到处理器P的本地运行队列,等待调度器调度执行
goready(gp, 3)
}
}

总结

本文简单介绍了channel通道的原理,整体来看channel源码不是很复杂。一般来说channel不需要用户显示关闭,Go会自动进行GC。总之需要记住下面这个表:

操作 nil的channel 正常channel 已关闭channel
<- ch 阻塞 成功或阻塞 读到零值
ch <- 阻塞 成功或阻塞 panic
close(ch) panic 成功 panic

参考