分享
  1. 首页
  2. 文章

go中sync.Mutex源码解读

小中01 · · 566 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

什么是sync.Mutex
sync.Mutex是Go标准库中常用的一个排外锁。当一个goroutine获得了这个锁的拥有权后, 其它请求锁的goroutine就会阻塞在Lock方法的调用上,直到锁被释放。

var (
mu sync.Mutex
balance int
)

func main() {
Deposit(1)
fmt.Println(Balance())
}

func Deposit(amount int) {
mu.Lock()
balance = balance + amount
mu.Unlock()
}

func Balance() int {
mu.Lock()
b := balance
mu.Unlock()
return b
}
使用起来很简单,对需要锁定的资源,前面加Lock()锁定,完成的时候加Unlock()解锁就好了。

分析下源码
const (
// mutex is locked
// 是否加锁的标识
mutexLocked = 1 << iota
mutexWoken
mutexStarving
mutexWaiterShift = iota

// 公平锁
//
// 锁有两种模式:正常模式和饥饿模式。
// 在正常模式下,所有的等待锁的goroutine都会存在一个先进先出的队列中(轮流被唤醒)
// 但是一个被唤醒的goroutine并不是直接获得锁,而是仍然需要和那些新请求锁的(new arrivial)
// 的goroutine竞争,而这其实是不公平的,因为新请求锁的goroutine有一个优势——它们正在CPU上
// 运行,并且数量可能会很多。所以一个被唤醒的goroutine拿到锁的概率是很小的。在这种情况下,
// 这个被唤醒的goroutine会加入到队列的头部。如果一个等待的goroutine有超过1ms(写死在代码中)
// 都没获取到锁,那么就会把锁转变为饥饿模式。
//
// 在饥饿模式中,锁的所有权会直接从释放锁(unlock)的goroutine转交给队列头的goroutine,
// 新请求锁的goroutine就算锁是空闲状态也不会去获取锁,并且也不会尝试自旋。它们只是排到队列的尾部。
//
// 如果一个goroutine获取到了锁之后,它会判断以下两种情况:
// 1. 它是队列中最后一个goroutine;
// 2. 它拿到锁所花的时间小于1ms;
// 以上只要有一个成立,它就会把锁转变回正常模式。
// 正常模式会有比较好的性能,因为即使有很多阻塞的等待锁的goroutine,
// 一个goroutine也可以尝试请求多次锁。
// 饥饿模式对于防止尾部延迟来说非常的重要。
starvationThresholdNs = 1e6

)

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
// mutex锁当前的状态
state int32
// 信号量,用于唤醒goroutine
sema uint32
}
重点开看下state的几种状态:

大神写代码的思路就是惊奇,这里state又运用到了位移的操作

mutexLocked 对应右边低位第一个bit 1 代表锁被占用 0代表锁空闲

mutexWoken 对应右边低位第二个bit 1 表示已唤醒 0表示未唤醒

mutexStarving 对应右边低位第三个bit 1 代表锁处于饥饿模式 0代表锁处于正常模式

mutexWaiterShift 值为3,根据 mutex.state >> mutexWaiterShift 得到当前阻塞的goroutine数目,最多可以阻塞2^29个goroutine。

starvationThresholdNs 值为1e6纳秒,也就是1毫秒,当等待队列中队首goroutine等待时间超过starvationThresholdNs也就是1毫秒,mutex进入饥饿模式。

sync_mutex
Lock
加锁基本上就这三种情况:

1、可直接获取锁,直接加锁,返回;

2、有冲突 首先自旋,如果其他goroutine在这段时间内释放了该锁,直接获得该锁;如果没有就走到下面3;

3、有冲突,且已经过了自旋阶段,通过信号量进行阻塞;

1、刚被唤醒的 加入到等待队列首部;

2、新加入的 加入到等待队列的尾部。

4、有冲突,根据不同的模式做处理;

1、饥饿模式 获取锁

2、正常模式 唤醒,继续循环,回到2

// Lock locks m.
// 如果锁正在使用中,新的goroutine请求,将被阻塞,直到锁被释放
func (m *Mutex) Lock() {
// 原子的(cas)来判断是否加锁
// 如果可以获取锁,直接加锁,返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 这把锁,已经被别的goroutine持有
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64
// 是否处于饥饿模式
starving := false
// 用来存当前goroutine是否已唤醒
awoke := false
// 用来存当前goroutine的循环次数
iter := 0
// 记录下当前的状态
old := m.state
for {
// 第一个条件是state已被锁,但是不是饥饿状态。如果时饥饿状态,自旋时没有用的,锁的拥有权直接交给了等待队列的第一个。
// 第二个条件是还可以自旋,多核、压力不大并且在一定次数内可以自旋, 具体的条件可以参考sync_runtime_canSpin的实现。
// 如果满足这两个条件,不断自旋来等待锁被释放、或者进入饥饿状态、或者不能再自旋。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 主动自旋
runtime_doSpin()
// 循环次数加一
iter++
old = m.state
continue
}

 // 到了这一步, state的状态可能是:
 // 1. 锁还没有被释放,锁处于正常状态
 // 2. 锁还没有被释放, 锁处于饥饿状态
 // 3. 锁已经被释放, 锁处于正常状态
 // 4. 锁已经被释放, 锁处于饥饿状态
 // new 复制 state的当前状态, 用来设置新的状态
 // old 是锁当前的状态
 new := old
 // 如果old state状态不是饥饿状态, new state 设置锁, 尝试通过CAS获取锁,
 // 如果old state状态是饥饿状态, 则不设置new state的锁,因为饥饿状态下锁直接转给等待队列的第一个.
 if old&mutexStarving == 0 {
 // 伪代码:newState = locked
 new |= mutexLocked
 }
 // 如果锁是被获取状态,或者饥饿状态
 // 就把期望状态中的等待队列的等待者数量+1(实际上是new + 8)
 if old&(mutexLocked|mutexStarving) != 0 {
 new += 1 << mutexWaiterShift
 }
 // 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
 // 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态.
 if starving && old&mutexLocked != 0 {
 // 设置为饥饿状态
 new |= mutexStarving
 }
 if awoke {
 // goroutine已从睡眠中唤醒,
 // 因此,无论哪种情况,我们都需reset
 if new&mutexWoken == 0 {
 throw("sync: inconsistent mutex state")
 }
 // 设置new设置为非唤醒状态
 // &^的意思是and not
 new &^= mutexWoken
 }
 // 原子(cas)更新state的状态
 // 注意new的锁标记不一定是true, 也可能只是标记一下锁的state是饥饿状态.
 if atomic.CompareAndSwapInt32(&m.state, old, new) {
 // 如果说old状态不是饥饿状态也不是被获取状态
 // 那么代表当前goroutine已经通过CAS成功获取了锁
 if old&(mutexLocked|mutexStarving) == 0 {
 // 直接break
 break // locked the mutex with CAS
 }
 // 如果我们之前已经在等了,那就排在队伍前面。
 queueLifo := waitStartTime != 0
 // 如果说之前没有等待过,就初始化设置现在的等待时间
 if waitStartTime == 0 {
 waitStartTime = runtime_nanotime()
 }
 // queueLifo为true,也就是之前已经在等了
 // runtime_SemacquireMutex中的lifo为true,则将等待服务程序放在等待队列的开头。
 // 会被阻塞
 runtime_SemacquireMutex(&m.sema, queueLifo, 1)
 // 阻塞被唤醒
 // 如果当前goroutine已经是饥饿状态了
 // 或者当前goroutine已经等待了1ms(在上面定义常量)以上
 // 就把当前goroutine的状态设置为饥饿
 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
 old = m.state
 // 如果是饥饿模式
 if old&mutexStarving != 0 {
 // 如果goroutine被唤醒,互斥锁处于饥饿模式
 // 锁的所有权转移给当前goroutine,但是锁处于不一致的状态中:mutexLocked没有设置
 // 并且我们将仍然被认为是waiter。这个状态需要被修复。
 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
 throw("sync: inconsistent mutex state")
 }
 // 当前goroutine获取锁,waiter数量-1
 delta := int32(mutexLocked - 1<<mutexWaiterShift)
 // 如果当前goroutine非饥饿状态,或者说当前goroutine是队列中最后一个goroutine
 // 那么就退出饥饿模式,把状态设置为正常
 if !starving || old>>mutexWaiterShift == 1 {
 // 退出饥饿模式
 // 在这里这么做至关重要,还要考虑等待时间。
 // 饥饿模式是非常低效率的,一旦两个goroutine将互斥锁切换为饥饿模式,它们便可以无限锁。
 delta -= mutexStarving
 }
 // 原子的加上更新的值
 atomic.AddInt32(&m.state, delta)
 break
 }
 // 不是饥饿模式,就把当前的goroutine设为被唤醒
 awoke = true
 // 重置循环的次数
 iter = 0
 } else {
 // 如果CAS不成功,也就是说没能成功获得锁,锁被别的goroutine获得了或者锁一直没被释放
 // 那么就更新状态,重新开始循环尝试拿锁
 old = m.state
 }
}
if race.Enabled {
 race.Acquire(unsafe.Pointer(m))
}

}

const (
active_spin = 4
)

// src/runtime/proc.go
// Active spinning for sync.Mutex.
// go:linkname sync_runtime_canSpin sync.runtime_canSpin
// go:nosplit
func sync_runtime_canSpin(i int) bool {
// sync.Mutex是会被多个goroutine竞争的,所以自旋的次数需要控制
// active_spin的值为4
// 满足下面的添加才会发生自旋
// 1、自旋的次数小于active_spin也就是4
// 2、如果在单核的cpu是不能自旋的
// 3、 GOMAXPROCS> 1,并且至少有一个其他正在运行的P,并且本地runq为空。
// 4、当前P没有其它等待运行的G
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

// src/runtime/proc.go
// go:linkname sync_runtime_doSpin sync.runtime_doSpin
// go:nosplit
// procyield的实现是用汇编实现的
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}

// src/runtime/asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,0ドル-0
MOVL cycles+0(FP), AX
again:
// 让加锁失败时cpu睡眠30个(about)clock,从而使得读操作的频率低很多。流水线重排的代价也会小很多
PAUSE
SUBL 1,ドル AX
JNZ again
RET
深圳网站建设www.sz886.com


有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:小中01

查看原文:go中sync.Mutex源码解读

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
566 次点击
1 回复 | 直到 2025年05月09日 13:38:26
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏