Go中的channel的实现
在Go中最常见的就是通信顺序进程(Communicating sequential processes,CSP)的并发模型,通过共享通信,来实现共享内存,这里就提到了channel.
Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Go 语言中的 Goroutine 会通过 Channel 传递数据。
Goroutine通过使用channel传递数据,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。
Channel 收发操作均遵循了先入先出(FIFO)的设计,具体规则如下:
- 先从 Channel 读取数据的 Goroutine 会先接收到数据;
- 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;
Channel 通常会有以下三种类型:
- 同步 Channel — 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
- 异步 Channel — 基于环形缓存的传统生产者消费者模型;
chan struct{}
类型的异步Channel
的struct{}
类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;
Channel 在运行时使用 runtime.hchan
结构体表示:
type hchan struct {
qcount uint // 当前队列里还剩余元素个数
dataqsiz uint // 环形队列长度,即缓冲区的大小,即make(chan T,N) 中的N
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标识当前通道是否处于关闭状态,创建通道后,该字段设置0,即打开通道;通道调用close将其设置为1,通道关闭
elemtype *_type // 元素类型,用于数据传递过程中的赋值
sendx uint // 环形缓冲区的状态字段,它只是缓冲区的当前索引-支持数组,它可以从中发送数据
recvx uint // 环形缓冲区的状态字段,它只是缓冲区当前索引-支持数组,它可以从中接受数据
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex // 互斥锁,为每个读写操作锁定通道,因为发送和接受必须是互斥操作
}
type waitq struct {
first *sudog
last *sudog
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
其中hchan结构体中有五个字段是构建底层的循环队列:
* qcount — Channel 中的元素个数;
* dataqsiz — Channel 中的循环队列的长度;
* buf — Channel 的缓冲区数据指针;
* sendx — Channel 的发送操作处理到的位置;
* recvx — Channel 的接收操作处理到的位置;
2
3
4
5
通常, elemsize
和 elemtype
分别表示当前 Channel 能够收发的元素类型和大小.
sendq
和 recvq
存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表runtime.waitq
表示,链表中所有的元素都是runtime.sudog
结构.
waitq
表示一个在等待列表中的 Goroutine,该结构体中存储了阻塞的相关信息以及两个分别指向前后runtime.sudog
的指针。
channel 在Go中是通过make关键字创建,编译器会将make(chan int,10).
创建管道:
runtime.makechan
和 runtime.makechan64
会根据传入的参数类型和缓冲区大小创建一个新的 Channel 结构,其中后者用于处理缓冲区大小大于 2 的 32 次方的情况.
这里我们来详细看下makechan
函数:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Channel 中根据收发元素的类型和缓冲区的大小初始化 runtime.hchan
结构体和缓冲区:
arena区域就是我们所谓的堆区,Go动态分配的内存都是在这个区域,它把内存分割成8KB大小的页,一些页组合起来称为mspan。
bitmap区域标识arena区域哪些地址保存了对象,并且用4bit标志位表示对象是否包含指针、GC标记信息。bitmap中一个byte大小的内存对应arena区域中4个指针大小(指针大小为 8B )的内存,所以bitmap区域的大小是512GB/(4*8B)=16GB。
此外我们还可以看到bitmap的高地址部分指向arena区域的低地址部分,这里bitmap的地址是由高地址向低地址增长的。
spans区域存放mspan(是一些arena分割的页组合起来的内存管理基本单元,后文会再讲)的指针,每个指针对应一页,所以spans区域的大小就是512GB/8KB*8B=512MB。
除以8KB是计算arena区域的页数,而最后乘以8是计算spans区域所有指针的大小。创建mspan的时候,按页填充对应的spans区域,在回收object时,根据地址很容易就能找到它所属的mspan。