扫码订阅《 》或入驻星球,即可阅读文章!

GOLANG ROADMAP

阅读模式

  • 沉浸
  • 自动
  • 日常
首页
Go学习
  • Go学院

    • Go小课
    • Go视界
    • Go小考
    • Go实战
  • Go资源

    • 优质课程
    • 在线宝典
    • 资源下载
    • 帮找资源
训练营 🔥
  • Go体系课&实战训练营
  • 升值加薪陪跑训练营
Go求职
  • 求职刷题

    • 企业题库
    • 面试宝典
    • 求职面经
  • 求职服务

    • 内推互助
    • 求职助力
    • 内推公司
Go友会
  • 城市
  • 校园
推广返佣
  • 返佣排行
  • 返佣规则
  • 推广学院
实验区
  • Go周边
  • Go宝典

    • 推荐图书
    • 精品博文
  • Go开源

    • Go仓库
    • Go月刊
更多
  • 用户中心

    • 我的信息
    • 我的返佣
    • 我的消息
  • 玩转星球

    • 星球介绍
    • 星主权益
    • 吐槽专区
    • 成长记录
  • 合作交流

    • 商务合作
    • 讲师招募
    • 生态伙伴
author-avatar

GOLANG ROADMAP


首页
Go学习
  • Go学院

    • Go小课
    • Go视界
    • Go小考
    • Go实战
  • Go资源

    • 优质课程
    • 在线宝典
    • 资源下载
    • 帮找资源
训练营 🔥
  • Go体系课&实战训练营
  • 升值加薪陪跑训练营
Go求职
  • 求职刷题

    • 企业题库
    • 面试宝典
    • 求职面经
  • 求职服务

    • 内推互助
    • 求职助力
    • 内推公司
Go友会
  • 城市
  • 校园
推广返佣
  • 返佣排行
  • 返佣规则
  • 推广学院
实验区
  • Go周边
  • Go宝典

    • 推荐图书
    • 精品博文
  • Go开源

    • Go仓库
    • Go月刊
更多
  • 用户中心

    • 我的信息
    • 我的返佣
    • 我的消息
  • 玩转星球

    • 星球介绍
    • 星主权益
    • 吐槽专区
    • 成长记录
  • 合作交流

    • 商务合作
    • 讲师招募
    • 生态伙伴
  • 面试宝典系列

    • Golang面试题汇总
  • 宝典内容

    • 1.Golang中除了加Mutex锁以外还有哪些方式安全读写共享变量
    • 2.无缓冲Chan的发送和接收是否同步?
    • 3.Golang并发机制以及它所使用的CSP并发模型
    • 4.Golang中常用的并发模型
    • 5.Go中对nil的Slice和空Slice的处理是一致的吗?
    • 6.协程和线程和进程的区别?
    • 7.Golang的内存模型中为什么小对象多了会造成GC压力?
    • 8.Go中数据竞争问题怎么解决?
    • 9.什么是channel,为什么它可以做到线程安全?
    • 10.Golang垃圾回收算法?
    • 11.GC的触发条件
    • 12.Go的GPM如何调度?
    • 13.并发编程概念是什么?
    • 14.Go语言的栈空间管理是怎么样的?
    • 15.Goroutine和Channel的作用分别是什么?
    • 16.怎么查看Goroutine的数量?
    • 17.Go中的锁有哪些?
    • 18.怎么限制Goroutine的数量?
    • 19.Channel是同步的还是异步的?
    • 20.Goroutine和线程的区别?
    • 21.Go的Struct能不能比较?
    • 22.Go的defer原理是什么?
    • 23.Go的select可以用于什么?
    • 24.Context包的用途是什么?
    • 25.Go主协程如何等其余协程完再操作?
    • 26.Go的Slice如何扩容?
    • 27.Go中的map如何实现顺序读取?
    • 28.Go中CAS是怎么回事?
    • 29.Go中的逃逸分析是什么?
    • 30.Go值接收者和指针接收者的区别?
    • 31.Go的对象在内存中是怎样分配的?
    • 32.栈的内存是怎么分配的?
    • 33.堆内存管理怎么分配的?
    • 34.Go中的defer函数使用下面的两种情况下结果是什么?
    • 35.在Go函数中为什么会发生内存泄露?
    • 36.Go中new和make的区别?
    • 37.G0的作用?
    • 38.Go中的锁如何实现?
    • 39.Go中的channel的实现?
    • 40.Go中的map的实现
    • 41.Go中的http包的实现原理?
    • 42.Goroutine发生了泄漏如何检测?
    • 43.Go函数返回局部变量的指针是否安全?
    • 44.Go中两个Nil可能不相等吗?
    • 45.Goroutine和KernelThread之间是什么关系?

扫码订阅《 》或入驻星球,即可阅读文章!

38.Go中的锁如何实现?


面试宝典
Go中的锁如何实现

锁是一种同步机制,用于在多任务环境中限制资源的访问,以满足互斥需求。

go源码sync包中经常用于同步操作的方式:

  • 原子操作
  • 互斥锁
  • 读写锁
  • waitgroup

我们着重来分析下互斥锁和读写锁.

互斥锁:

下面是互斥锁的数据结构:

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
	state int32   // 互斥锁上锁状态枚举值如下所示
	sema  uint32  // 信号量,向处于Gwaitting的G发送信号
}
const (
	mutexLocked = 1 << iota // 值为1,表示在state中由低向高第1位,意义:锁是否可用,0可用,1不可用,锁定中
	mutexWoken   // 值为2,表示在state中由低向高第2位,意义:mutex是否被唤醒
	mutexStarving // 当前的互斥锁进入饥饿状态;
	mutexWaiterShift = iota  //值为2,表示state中统计阻塞在此mutex上goroutine的数目需要位移的偏移量
	starvationThresholdNs = 1e6 
1
2
3
4
5
6
7
8
9
10
11
12
13
14

state和sema两个加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁。

互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 等待互斥锁的释放.

在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

  • mutexLocked 表示互斥锁的锁定状态;
  • mutexWoken 表示从正常模式被从唤醒;
  • mutexStarving 当前的互斥锁进入饥饿状态;
  • waitersCount 当前互斥锁上等待的 Goroutine 个数;

sync.Mutex 有两种模式,正常模式和饥饿模式。

在正常模式下,锁的等待者会按照先进先出的顺序获取锁。

但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被饿死。

在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。

如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。

相比于饥饿模式,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。

互斥锁的加锁是靠 sync.Mutex.Lock 方法完成的, 当锁的状态是 0 时,将 mutexLocked 位置成 1:

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow 尝试通过自旋(Spinnig)等方式等待锁的释放,

这个方法是一个非常大 for 循环,它获取锁的过程:

  1. 判断当前 Goroutine 能否进入自旋;
  2. 通过自旋等待互斥锁的释放;
  3. 计算互斥锁的最新状态;
  4. 更新互斥锁的状态并获取锁;

那么互斥锁是如何判断当前 Goroutine 能否进入自旋等互斥锁的释放,是通过它的lockSlow方法, 由于自旋是一种多线程同步机制,所以呢当前的进程在进入自旋的过程中会一直保持对 CPU 的占用,持续检查某个条件是否为真。 通常在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用得当会对性能带来很大的增益,但是往往使用的不得当就会拖慢整个程序.

所以 Goroutine 进入自旋的条件非常苛刻:

  • 互斥锁只有在普通模式才能进入自旋;
  • runtime.sync_runtime_canSpin 需要返回 true: a. 需要运行在多 CPU 的机器上; b. 当前的Goroutine 为了获取该锁进入自旋的次数小于四次; c. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;

一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin 和 runtime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间.

处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。

通过几个不同的条件分别会更新 state 字段中存储的不同信息,mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:

	new := old
	if old&mutexStarving == 0 {
		new |= mutexLocked
	}
	if old&(mutexLocked|mutexStarving) != 0 {
		new += 1 << mutexWaiterShift
	}
	if starving && old&mutexLocked != 0 {
		new |= mutexStarving
	}
	if awoke {
		new &^= mutexWoken
	}
1
2
3
4
5
6
7
8
9
10
11
12
13

计算了新的互斥锁状态之后,就会使用 CAS 函数 sync/atomic.CompareAndSwapInt32 更新该状态:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // 通过 CAS 函数获取了锁
			}
			...
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

如果我们没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex 使用信号量保证资源不会被两个 Goroutine 获取。

runtime.sync_runtime_SemacquireMutex 会在方法中不断调用尝试获取锁并休眠当前 Goroutine 等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,sync.Mutex.Lock 方法的剩余代码也会继续执行。

在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环. 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出.

互斥锁的解锁过程 sync.Mutex.Unlock 与加锁过程相比就很简单,该过程会先使用 sync/atomic.AddInt32 函数快速解锁,这时会发生下面的两种情况:

  • 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
  • 如果该函数返回的新状态不等于 0,这段代码会调用 sync.Mutex.unlockSlow 方法开始慢速解锁:
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

sync.Mutex.unlockSlow 方法首先会校验锁状态的合法性, 如果当前互斥锁已经被解锁过了就会直接抛出异常 sync: unlock of unlocked mutex 中止当前程序。

在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁.

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// Grab the right to wake someone.
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		runtime_Semrelease(&m.sema, true, 1)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

在正常模式下,这段代码会分别处理以下两种情况处理:

  • 如果互斥锁不存在等待者或者互斥锁的 mutexLocked、mutexStarving、mutexWoken 状态不都为 0,那么当前方法就可以直接返回,不需要唤醒其他等待者;
  • 如果互斥锁存在等待者,会通过 sync.runtime_Semrelease唤醒等待者并移交锁的所有权;

在饥饿模式下,上述代码会直接调用 sync.runtime_Semrelease 方法将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态;

互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:

  • 如果互斥锁处于初始化状态,就会直接通过置位 mutexLocked 加锁;
  • 如果互斥锁处于 mutexLocked 并且在普通模式下工作,就会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
  • 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
  • 互斥锁在正常情况下会通过runtime.sync_runtime_SemacquireMutex函数将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒当前 Goroutine;
  • 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,当前 Goroutine 会将互斥锁切换回正常模式;

互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:

  • 当互斥锁已经被解锁时,那么调用 sync.Mutex.Unlock 会直接抛出异常;
  • 当互斥锁处于饥饿模式时,会直接将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,就会直接返回;在其他情况下会通过sync.runtime_Semrelease 唤醒对应的 Goroutine.

读写锁:

读写互斥锁sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。

sync.RWMutex 中总共包含5 个字段:

type RWMutex struct {
	w           Mutex  // 复用互斥锁提供的能力
	writerSem   uint32 // 写等待读
	readerSem   uint32 // 读等待写
	readerCount int32  // 存储了当前正在执行的读操作的数量
	readerWait  int32  // 当写操作被阻塞时等待的读操作个数
}
1
2
3
4
5
6
7

我们从写锁开始分析:

当我们想要获取写锁时,需要调用 sync.RWMutex.Lock 方法:

func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  • 这里调用结构体持有的 sync.Mutex 的 sync.Mutex.Lock 方法阻塞后续的写操作;

因为互斥锁已经被获取,其他 Goroutine 在获取写锁时就会进入自旋或者休眠;

  • 调用 sync/atomic.AddInt32 方法阻塞后续的读操作:

如果仍然有其他 Goroutine 持有互斥锁的读锁(r != 0),该 Goroutine 会调用 runtime.sync_runtime_SemacquireMutex 进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒。

写锁的释放会调用 sync.RWMutex.Unlock 方法:

func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}

	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
	if race.Enabled {
		race.Enable()
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

解锁与加锁的过程正好相反,写锁的释放分为以下几个步骤:

  1. 调用 sync/atomic.AddInt32 函数将 readerCount 变回正数,释放读锁;
  2. 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine:
  3. 调用 sync.Mutex.Unlock 方法释放写锁;

获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作饿死。

接着是读锁:

读锁的加锁方法 sync.RWMutex.RLock 就比较简单了,该方法会通过 sync/atomic.AddInt32 将 readerCount 加一:

func (rw *RWMutex) RLock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

如果RLock该方法返回负数,其他 Goroutine 获得了写锁,当前 Goroutine 就会调用runtime.sync_runtime_SemacquireMutex 陷入休眠等待锁的释放; 如果RLock该方法的结果为非负数,没有 Goroutine 获得写锁,当前方法就会成功返回.

当 Goroutine 想要释放读锁时,会调用如下所示的RUnlock方法:

func (rw *RWMutex) RUnlock() {
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
	if race.Enabled {
		race.Enable()
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

该方法会先减少正在读资源的readerCount 整数,根据 sync/atomic.AddInt32 的返回值不同会分别进行处理:

  • 如果返回值大于等于零,表示读锁直接解锁成功.
  • 如果返回值小于零 ,表示有一个正在执行的写操作,在这时会调用rUnlockSlow方法.
func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}
1
2
3
4
5
6
7
8
9
10
11

rUnlockSlow该方法会减少获取锁的写操作等待的读操作数readerWait并在所有读操作都被释放之后触发写操作的信号量,writerSem,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine。

其实读写互斥锁(sync.RWMutex),虽然提供的功能非常复杂,不过因为它是在互斥锁( sync.Mutex)的基础上,所以整体的实现上会简单很多。

因此呢:

  • 调用 sync.RWMutex.Lock 尝试获取写锁时;

每次 sync.RWMutex.RUnlock 都会将 readerCount 其减一,当它归零时该 Goroutine 就会获得写锁, 将 readerCount 减少 rwmutexMaxReaders 个数以阻塞后续的读操作.

  • 调用 sync.RWMutex.Unlock 释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;

读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。