😥 整理不易,此资源只针对正式星主开放,
还请入驻星球后再来观看。

GOLANG ROADMAP

阅读模式

  • 沉浸
  • 自动
  • 日常
首页
Go学习
  • Go学院

    • Go小课
    • Go小考
    • Go实战
    • 精品课
  • Go宝典

    • 在线宝典
    • B站精选
    • 推荐图书
    • 精品博文
  • Go开源

    • Go仓库
    • Go月刊
  • Go下载

    • 视频资源
    • 文档资源
Go求职
  • 求职服务

    • 内推互助
    • 求职助力
  • 求职刷题

    • 企业题库
    • 面试宝典
    • 求职面经
Go友会
  • 城市
  • 校园
推广返利 🤑
实验区
  • Go周边
消息
更多
  • 用户中心

    • 我的信息
    • 推广返利
  • 玩转星球

    • 星球介绍
    • 角色体系
    • 星主权益
  • 支持与服务

    • 联系星主
    • 成长记录
    • 常见问题
    • 吐槽专区
  • 合作交流

    • 渠道合作
    • 课程入驻
    • 友情链接
author-avatar

GOLANG ROADMAP


首页
Go学习
  • Go学院

    • Go小课
    • Go小考
    • Go实战
    • 精品课
  • Go宝典

    • 在线宝典
    • B站精选
    • 推荐图书
    • 精品博文
  • Go开源

    • Go仓库
    • Go月刊
  • Go下载

    • 视频资源
    • 文档资源
Go求职
  • 求职服务

    • 内推互助
    • 求职助力
  • 求职刷题

    • 企业题库
    • 面试宝典
    • 求职面经
Go友会
  • 城市
  • 校园
推广返利 🤑
实验区
  • Go周边
消息
更多
  • 用户中心

    • 我的信息
    • 推广返利
  • 玩转星球

    • 星球介绍
    • 角色体系
    • 星主权益
  • 支持与服务

    • 联系星主
    • 成长记录
    • 常见问题
    • 吐槽专区
  • 合作交流

    • 渠道合作
    • 课程入驻
    • 友情链接
  • Go真实面试题汇总系列

    • Go基础篇
  • 宝典内容

    • 2. 对已经关闭的channel进行读写操作会发生什么?
    • 7.介绍一下通道
    • 8.channel实现方式/原理/概念/底层实现
    • 9. 同一个协程里面,对无缓冲channel同时发送和接收数据有什么问题
    • 10.channel和锁的对比
    • 11. channel的应用场景
    • 14. 向为nil的channel发送数据会怎么样
    • 28. channel有缓冲和无缓冲在使用上有什么区别?
    • 47. 管道是否能二次关闭?
    • 48. 管道关闭是否能读写?
    • 52. channel是否线程安全等;
    • 91. go利用channel通信的方式
    • 103. 被close的channel会有什么问题
    • 104. 分布式锁知道哪些?用channel如何实现?
    • 105. 集群用channel如何实现分布式锁
    • 118. Channel 的使用场景
    • 149. gochannel实现排序
    • 189. go 里的 syncLock 和 channel 的性能有区别吗?
    • 209. 了解过channel吗?
    • 210. channel内部数据结构有了解吗?
    • 216. 介绍下go的chan,chan可以做什么
    • 217. 如何实现限流器,请用chan实现一种限流器,也可以不用chan实现
    • 260. 如何判断channel已关闭?
    • 273. channel关闭以后,再往其发送或接收,会发生什么
    • 307. go channel底层实现
    • 331. 说说你了解的管道
    • 332. 看过管道的底层源码吗
    • 346. 实现chan struct{}不占用内存空间,实现传递信号的通道
    • 364. channel有了解吗?
    • 382. golang的管道怎么用;我说是channel
    • 390. channel和锁对比一下
    • 391. channel的应用场景
    • 395. chan的阻塞和非阻塞?
    • 405. channel怎么实现线程安全
    • 412. channelnobuffer以及buffer的区别。
    • 425. channel 底层
    • 426. sync包中的同步原语和channel的使用场景
    • 436. channel和共享内存有什么优劣势?

😥 整理不易,此资源只针对正式星主开放,
还请入驻星球后再来观看。

332. 看过管道的底层源码吗


企业题库解析小组

题目序号:(3620) 题目来源: 百度 频次: 1

答案:小强

  • Channel原理 channel的底层是一个hchan的结构体 基于锁实现的
type hchan struct {
	// chan 里元素数量
	qcount   uint
	// chan 底层循环数组的长度
	dataqsiz uint
	// 指向底层循环数组的指针
	// 只针对有缓冲的 channel
	buf      unsafe.Pointer
	// chan 中元素大小
	elemsize uint16
	// chan 是否被关闭的标志
	closed   uint32
	// chan 中元素类型
	elemtype *_type // element type
	// 已发送元素在循环数组中的索引
	sendx    uint   // send index
	// 已接收元素在循环数组中的索引
	recvx    uint   // receive index
	// 等待接收的 goroutine 队列
	recvq    waitq  // list of recv waiters
	// 等待发送的 goroutine 队列
	sendq    waitq  // list of send waiters

	// 保护 hchan 中所有字段
	lock mutex
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  • Channel 发送
  1. 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;
  2. 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上;
  3. 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 如果 channel 是 nil
	if c == nil {
		// 不能阻塞,直接返回 false,表示未发送成功
		if !block {
			return false
		}
		// 当前 goroutine 被挂起
		gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
		throw("unreachable")
	}

	// 省略 debug 相关……

	// 对于不阻塞的 send,快速检测失败场景
	//
	// 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
	// 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
	// 2. channel 是缓冲型的,但循环数组已经装满了元素
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	// 锁住 channel,并发安全
	lock(&c.lock)

	// 如果 channel 关闭了
	if c.closed != 0 {
		// 解锁
		unlock(&c.lock)
		// 直接 panic
		panic(plainError("send on closed channel"))
	}

	// 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 对于缓冲型的 channel,如果还有缓冲空间
	if c.qcount < c.dataqsiz {
		// qp 指向 buf 的 sendx 位置
		qp := chanbuf(c, c.sendx)

		// ……

		// 将数据从 ep 处拷贝到 qp
		typedmemmove(c.elemtype, qp, ep)
		// 发送游标值加 1
		c.sendx++
		// 如果发送游标值等于容量值,游标值归 0
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		// 缓冲区的元素数量加一
		c.qcount++

		// 解锁
		unlock(&c.lock)
		return true
	}

	// 如果不需要阻塞,则直接返回错误
	if !block {
		unlock(&c.lock)
		return false
	}

	// channel 满了,发送方会被阻塞。接下来会构造一个 sudog

	// 获取当前 goroutine 的指针
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}

	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.selectdone = nil
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil

	// 当前 goroutine 进入发送等待队列
	c.sendq.enqueue(mysg)

	// 当前 goroutine 被挂起
	goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

	// 从这里开始被唤醒了(channel 有机会可以发送了)
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		// 被唤醒后,channel 关闭了。坑爹啊,panic
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	// 去掉 mysg 上绑定的 channel
	mysg.c = nil
	releaseSudog(mysg)
	return true
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
  • Channel 接收
  1. 如果Channel为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
  2. 如果Channel已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回;
  3. 如果Channel的sendq队列中存在挂起的Goroutine,会将recvx索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区;
  4. 如果 Channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据;
  5. 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// 省略 debug 内容 …………

	// 如果是一个 nil 的 channel
	if c == nil {
		// 如果不阻塞,直接返回 (false, false)
		if !block {
			return
		}
		// 否则,接收一个 nil 的 channel,goroutine 挂起
		gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
		// 不会执行到这里
		throw("unreachable")
	}

	// 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
	// 当我们观察到 channel 没准备好接收:
	// 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
	// 2. 缓冲型,但 buf 里没有元素
	// 之后,又观察到 closed == 0,即 channel 未关闭。
	// 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
	// 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	// 加锁
	lock(&c.lock)

	// channel 已关闭,并且循环数组 buf 里没有元素
	// 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
	// 也就是说即使是关闭状态,但在缓冲型的 channel,
	// buf 里有元素的情况下还能接收到元素
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(unsafe.Pointer(c))
		}
		// 解锁
		unlock(&c.lock)
		if ep != nil {
			// 从一个已关闭的 channel 执行接收操作,且未忽略返回值
			// 那么接收的值将是一个该类型的零值
			// typedmemclr 根据类型清理相应地址的内存
			typedmemclr(c.elemtype, ep)
		}
		// 从一个已关闭的 channel 接收,selected 会返回true
		return true, false
	}

	// 等待发送队列里有 goroutine 存在,说明 buf 是满的
	// 这有可能是:
	// 1. 非缓冲型的 channel
	// 2. 缓冲型的 channel,但 buf 满了
	// 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
	// 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	// 缓冲型,buf 里有元素,可以正常接收
	if c.qcount > 0 {
		// 直接从循环数组里找到要接收的元素
		qp := chanbuf(c, c.recvx)

		// …………

		// 代码里,没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// 清理掉循环数组里相应位置的值
		typedmemclr(c.elemtype, qp)
		// 接收游标向前移动
		c.recvx++
		// 接收游标归零
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		// buf 数组里的元素个数减 1
		c.qcount--
		// 解锁
		unlock(&c.lock)
		return true, true
	}

	if !block {
		// 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
		unlock(&c.lock)
		return false, false
	}

	// 接下来就是要被阻塞的情况了
	// 构造一个 sudog
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}

	// 待接收数据的地址保存下来
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.selectdone = nil
	mysg.c = c
	gp.param = nil
	// 进入channel 的等待接收队列
	c.recvq.enqueue(mysg)
	// 将当前 goroutine 挂起
	goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

	// 被唤醒了,接着从这里继续执行一些扫尾工作
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139