Go中的锁如何实现
锁是一种同步机制,用于在多任务环境中限制资源的访问,以满足互斥需求。
go源码sync包中经常用于同步操作的方式:
- 原子操作
- 互斥锁
- 读写锁
- waitgroup
我们着重来分析下互斥锁和读写锁.
互斥锁:
下面是互斥锁的数据结构:
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32 // 互斥锁上锁状态枚举值如下所示
sema uint32 // 信号量,向处于Gwaitting的G发送信号
}
const (
mutexLocked = 1 << iota // 值为1,表示在state中由低向高第1位,意义:锁是否可用,0可用,1不可用,锁定中
mutexWoken // 值为2,表示在state中由低向高第2位,意义:mutex是否被唤醒
mutexStarving // 当前的互斥锁进入饥饿状态;
mutexWaiterShift = iota //值为2,表示state中统计阻塞在此mutex上goroutine的数目需要位移的偏移量
starvationThresholdNs = 1e6
2
3
4
5
6
7
8
9
10
11
12
13
14
state和sema两个加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁。
互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 等待互斥锁的释放.
在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:
- mutexLocked 表示互斥锁的锁定状态;
- mutexWoken 表示从正常模式被从唤醒;
- mutexStarving 当前的互斥锁进入饥饿状态;
- waitersCount 当前互斥锁上等待的 Goroutine 个数;
sync.Mutex 有两种模式,正常模式和饥饿模式。
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。
但是刚被唤起的 Goroutine
与新创建的 Goroutine
竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被饿死。
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。
如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。
相比于饥饿模式,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。
互斥锁的加锁是靠 sync.Mutex.Lock 方法完成的, 当锁的状态是 0 时,将 mutexLocked
位置成 1:
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
如果互斥锁的状态不是 0 时就会调用 sync.Mutex.lockSlow
尝试通过自旋(Spinnig)等方式等待锁的释放,
这个方法是一个非常大 for 循环,它获取锁的过程:
- 判断当前 Goroutine 能否进入自旋;
- 通过自旋等待互斥锁的释放;
- 计算互斥锁的最新状态;
- 更新互斥锁的状态并获取锁;
那么互斥锁是如何判断当前 Goroutine 能否进入自旋等互斥锁的释放,是通过它的lockSlow方法, 由于自旋是一种多线程同步机制,所以呢当前的进程在进入自旋的过程中会一直保持对 CPU 的占用,持续检查某个条件是否为真。 通常在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用得当会对性能带来很大的增益,但是往往使用的不得当就会拖慢整个程序.
所以 Goroutine 进入自旋的条件非常苛刻:
- 互斥锁只有在普通模式才能进入自旋;
runtime.sync_runtime_canSpin
需要返回 true: a. 需要运行在多 CPU 的机器上; b. 当前的Goroutine 为了获取该锁进入自旋的次数小于四次; c. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin
和 runtime.procyield
并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间.
处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。
通过几个不同的条件分别会更新 state 字段中存储的不同信息,mutexLocked
、mutexStarving
、mutexWoken
和 mutexWaiterShift
:
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
new &^= mutexWoken
}
2
3
4
5
6
7
8
9
10
11
12
13
计算了新的互斥锁状态之后,就会使用 CAS 函数 sync/atomic.CompareAndSwapInt32 更新该状态:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // 通过 CAS 函数获取了锁
}
...
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
如果我们没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex
使用信号量保证资源不会被两个 Goroutine 获取。
runtime.sync_runtime_SemacquireMutex
会在方法中不断调用尝试获取锁并休眠当前 Goroutine 等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,sync.Mutex.Lock
方法的剩余代码也会继续执行。
在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环. 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出.
互斥锁的解锁过程 sync.Mutex.Unlock
与加锁过程相比就很简单,该过程会先使用 sync/atomic.AddInt32
函数快速解锁,这时会发生下面的两种情况:
- 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
- 如果该函数返回的新状态不等于 0,这段代码会调用
sync.Mutex.unlockSlow
方法开始慢速解锁:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
sync.Mutex.unlockSlow
方法首先会校验锁状态的合法性, 如果当前互斥锁已经被解锁过了就会直接抛出异常 sync: unlock of unlocked mutex
中止当前程序。
在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁.
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
在正常模式下,这段代码会分别处理以下两种情况处理:
- 如果互斥锁不存在等待者或者互斥锁的
mutexLocked
、mutexStarving
、mutexWoken
状态不都为 0,那么当前方法就可以直接返回,不需要唤醒其他等待者; - 如果互斥锁存在等待者,会通过
sync.runtime_Semrelease
唤醒等待者并移交锁的所有权;
在饥饿模式下,上述代码会直接调用 sync.runtime_Semrelease
方法将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态;
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
- 如果互斥锁处于初始化状态,就会直接通过置位 mutexLocked 加锁;
- 如果互斥锁处于 mutexLocked 并且在普通模式下工作,就会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
- 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
- 互斥锁在正常情况下会通过
runtime.sync_runtime_SemacquireMutex
函数将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒当前 Goroutine; - 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,当前 Goroutine 会将互斥锁切换回正常模式;
互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
- 当互斥锁已经被解锁时,那么调用
sync.Mutex.Unlock
会直接抛出异常; - 当互斥锁处于饥饿模式时,会直接将锁的所有权交给队列中的下一个等待者,等待者会负责设置
mutexLocked
标志位; - 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,就会直接返回;在其他情况下会通过
sync.runtime_Semrelease
唤醒对应的 Goroutine.
读写锁:
读写互斥锁sync.RWMutex
是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。
sync.RWMutex 中总共包含5 个字段:
type RWMutex struct {
w Mutex // 复用互斥锁提供的能力
writerSem uint32 // 写等待读
readerSem uint32 // 读等待写
readerCount int32 // 存储了当前正在执行的读操作的数量
readerWait int32 // 当写操作被阻塞时等待的读操作个数
}
2
3
4
5
6
7
我们从写锁开始分析:
当我们想要获取写锁时,需要调用 sync.RWMutex.Lock
方法:
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 这里调用结构体持有的
sync.Mutex
的sync.Mutex.Lock
方法阻塞后续的写操作;
因为互斥锁已经被获取,其他 Goroutine 在获取写锁时就会进入自旋或者休眠;
- 调用
sync/atomic.AddInt32
方法阻塞后续的读操作:
如果仍然有其他 Goroutine 持有互斥锁的读锁(r != 0)
,该 Goroutine 会调用 runtime.sync_runtime_SemacquireMutex
进入休眠状态等待所有读锁所有者执行结束后释放 writerSem
信号量将当前协程唤醒。
写锁的释放会调用 sync.RWMutex.Unlock
方法:
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
解锁与加锁的过程正好相反,写锁的释放分为以下几个步骤:
- 调用
sync/atomic.AddInt32
函数将readerCount
变回正数,释放读锁; - 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine:
- 调用
sync.Mutex.Unlock
方法释放写锁;
获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作饿死。
接着是读锁:
读锁的加锁方法 sync.RWMutex.RLock
就比较简单了,该方法会通过 sync/atomic.AddInt32
将 readerCount
加一:
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
如果RLock
该方法返回负数,其他 Goroutine 获得了写锁,当前 Goroutine 就会调用runtime.sync_runtime_SemacquireMutex
陷入休眠等待锁的释放;
如果RLock
该方法的结果为非负数,没有 Goroutine 获得写锁,当前方法就会成功返回.
当 Goroutine 想要释放读锁时,会调用如下所示的RUnlock
方法:
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
该方法会先减少正在读资源的readerCount
整数,根据 sync/atomic.AddInt32
的返回值不同会分别进行处理:
- 如果返回值大于等于零,表示读锁直接解锁成功.
- 如果返回值小于零 ,表示有一个正在执行的写操作,在这时会调用
rUnlockSlow
方法.
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
2
3
4
5
6
7
8
9
10
11
rUnlockSlow
该方法会减少获取锁的写操作等待的读操作数readerWait
并在所有读操作都被释放之后触发写操作的信号量,writerSem
,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine。
其实读写互斥锁(sync.RWMutex),虽然提供的功能非常复杂,不过因为它是在互斥锁( sync.Mutex)的基础上,所以整体的实现上会简单很多。
因此呢:
- 调用
sync.RWMutex.Lock
尝试获取写锁时;
每次 sync.RWMutex.RUnlock
都会将 readerCount
其减一,当它归零时该 Goroutine 就会获得写锁, 将 readerCount
减少 rwmutexMaxReaders
个数以阻塞后续的读操作.
- 调用
sync.RWMutex.Unlock
释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;
读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。