Go 语言的分布式读写互斥
u012275397 · · 3080 次点击 · · 开始浏览
Go语言默认的sync.RWMutex 实现在多核环境中表现并不佳,因为所有的读者在进行原子增量操作时,会抢占相同的内存地址。该文探讨了一种n-way
RWMutex,也可以称为"大读者(big reader)"锁,它可以为每个CPU内核分配独立的RWMutex。读者仅需在其核心中处理读锁,而写者则须依次处理所有锁。
查找当前CPU
读者使用CPUID指令来决定使用何种锁,该指令仅需返回当前活动CPU的APICID,而不需要发出系统调用指令抑或改变运行时。这在Intel或AMD处理器上均是可以的;ARM处理器则需要使用CPU ID寄存器 。对于超过256个处理器的系统,必须使用x2APIC,另外除了CPUID还要用到带有EAX=0xb的EDX寄存器。程序启动时,会构建(通过CPU亲和力系统调用) APICID到CPU索引的映射,该映射在处理器的整个生命周期中静态存在。由于CPUID指令的开销可能相当昂贵,goroutine将只在其运行的内核中定期地更新状态结果。频繁更新可以减少内核锁阻塞,但同时也会导致花在加锁过程中的CPUID指令时间增加。
陈旧的CPU信息。如果加上锁运行goroutine的CPU信息可能会是过时的(goroutine会转移到另一个核心)。在 reader记住哪个是上锁的前提下,这只会影响性能,而不会影响准确性,当然,这样的转移也是不太可能的,就像操作系统内核尝试在同一个核心保持线程来改进缓存命中率一样。
性能
这个模式的性能特征会被大量的参数所影响。特别是CPUID 检测频率,readers 的数量,readers 和writers 的比率,还有readers 持有锁的时间,这些因素都非常重要。当在这个时间有且仅有一个writer 活跃的时候,这个writer 持有锁的时期不会影响sync.RWMutex 和DRWMutex 之间的性能差异。
实验证明DRWMutex表现胜过多核系统,特别writer小于1%的时候,CPUID会在最多每10个锁之间被调用(这种变化取决于锁被持有的持续时间)。甚至在少核的情况下,DRWMutex也在普遍选择通过sync.Mutex使用sync.RWMutex的应用程序的情况下表现好过sync.RWMutex.
下图显示核数量使用增加每10个的平均性能:
drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100
DRWMutex and sync.RWMutex performance comparison
错误条表示第25和第75个百分位。注意每第10核的下降;这是因为10个核组成一个运行标准检查系统的机器上的NUMA节点, 所以一旦增加一个NUMA节点,跨线程通信量变得更加宝贵。对于DRWMutex来说,由于对比sync.RWMutex更多的reader能够并行工作,所以性能也随之提升。
查看go-nuts tread 进一步讨论
cpu_amd64.s
#include
"textflag.h"//
func cpu() uint64TEXT
路cpu(SB),NOSPLIT,0ドル-8MOVL
0ドルx01, AX //
version informationMOVL
0ドルx00, BX //
any leaf will doMOVL
0ドルx00, CX //
any subleaf will do//
call CPUIDBYTE 0ドルx0fBYTE 0ドルxa2SHRQ
24,ドル BX //
logical cpu id is put in EBX[31-24]MOVQ
BX, ret+0(FP)RETmain.go
package
mainimport
("flag""fmt""math/rand""os""runtime""runtime/pprof""sync""syscall""time""unsafe")func
cpu() uint64 //
implemented in cpu_amd64.svar
cpus map[uint64] int//
determine mapping from APIC ID to CPU index by pinning the entire process to//
one core at the time, and seeing that its APIC ID is.func
init() {cpus
= make(map[uint64] int )var
aff uint64syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY,
uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))n
:= 0start
:= time .Now()var
mask uint64 = 1Outer:for {for (aff
& mask) == 0 {mask
<<= 1if mask
== 0 || mask > aff {break Outer}}ret,
_, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask)))if ret
!= 0 {panic(err.Error())}//
what CPU do we have?<- time .After(1
* time .Millisecond)c
:= cpu()if oldn,
ok := cpus[c]; ok {fmt.Println( "cpu" ,
n, "==" ,
oldn, "--
both have CPUID" ,
c)}cpus[c]
= nmask
<<= 1n++}fmt.Printf( "%d/%d
cpus found in %v: %v\n" ,
len(cpus), runtime.NumCPU(), time .Now().Sub(start),
cpus)ret,
_, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))if ret
!= 0 {panic(err.Error())}}type
RWMutex2 []sync.RWMutexfunc
(mx RWMutex2) Lock() {for core
:= range mx {mx[core].Lock()}}func
(mx RWMutex2) Unlock() {for core
:= range mx {mx[core].Unlock()}}func
main() {cpuprofile
:= flag.Bool( "cpuprofile" , false , "enable
CPU profiling" )locks
:= flag.Uint64( "i" ,
10000, "Number
of iterations to perform" )write
:= flag.Float64( "p" ,
0.0001, "Probability
of write locks" )wwork
:= flag.Int( "w" ,
1, "Amount
of work for each writer" )rwork
:= flag.Int( "r" ,
100, "Amount
of work for each reader" )readers
:= flag.Int( "n" ,
runtime.GOMAXPROCS(0), "Total
number of readers" )checkcpu
:= flag.Uint64( "c" ,
100, "Update
CPU estimate every n iterations" )flag.Parse()var
o *os.Fileif *cpuprofile
{o,
_ := os.Create( "rw.out" )pprof.StartCPUProfile(o)}readers_per_core
:= *readers / runtime.GOMAXPROCS(0)var
wg sync.WaitGroupvar
mx1 sync.RWMutexstart1
:= time .Now()for n
:= 0; n < runtime.GOMAXPROCS(0); n++ {for r
:= 0; r < readers_per_core; r++ {wg.Add(1)go
func() {defer
wg.Done()r
:= rand .New( rand .NewSource( rand .Int63()))for n
:= uint64(0); n < *locks; n++ {if r.Float64()
< *write {mx1.Lock()x
:= 0for i
:= 0; i < *wwork; i++ {x++}_
= xmx1.Unlock()} else {mx1.RLock()x
:= 0for i
:= 0; i < *rwork; i++ {x++}_
= xmx1.RUnlock()}}}()}}wg.Wait()end1
:= time .Now()t1
:= end1.Sub(start1)fmt.Println( "mx1" ,
runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1)if *cpuprofile
{pprof.StopCPUProfile()o.Close()o,
_ = os.Create( "rw2.out" )pprof.StartCPUProfile(o)}mx2
:= make(RWMutex2, len(cpus))start2
:= time .Now()for n
:= 0; n < runtime.GOMAXPROCS(0); n++ {for r
:= 0; r < readers_per_core; r++ {wg.Add(1)go
func() {defer
wg.Done()c
:= cpus[cpu()]r
:= rand .New( rand .NewSource( rand .Int63()))for n
:= uint64(0); n < *locks; n++ {if *checkcpu
!= 0 && n%*checkcpu == 0 {c
= cpus[cpu()]}if r.Float64()
< *write {mx2.Lock()x
:= 0for i
:= 0; i < *wwork; i++ {x++}_
= xmx2.Unlock()} else {mx2[c].RLock()x
:= 0for i
:= 0; i < *rwork; i++ {x++}_
= xmx2[c].RUnlock()}}}()}}wg.Wait()end2
:= time .Now()pprof.StopCPUProfile()o.Close()t2
:= end2.Sub(start2)fmt.Println( "mx2" ,
runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2)}
本文转自:开源中国社区[http://www.oschina.net]
本文标题:Go语言的分布式读写互斥
本文地址:http://www.oschina.net/translate/distributed-read-write-
mutex-in-go
参与翻译:BuN_Ny , OSC技术周刊 , eason02
英文原文:Distributed Read-Write Mutex in Go
有疑问加站长微信联系(非本文作者)
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889
关注微信- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码` - 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传
收入到我管理的专栏 新建专栏
Go语言默认的sync.RWMutex 实现在多核环境中表现并不佳,因为所有的读者在进行原子增量操作时,会抢占相同的内存地址。该文探讨了一种n-way
RWMutex,也可以称为"大读者(big reader)"锁,它可以为每个CPU内核分配独立的RWMutex。读者仅需在其核心中处理读锁,而写者则须依次处理所有锁。
查找当前CPU
读者使用CPUID指令来决定使用何种锁,该指令仅需返回当前活动CPU的APICID,而不需要发出系统调用指令抑或改变运行时。这在Intel或AMD处理器上均是可以的;ARM处理器则需要使用CPU ID寄存器 。对于超过256个处理器的系统,必须使用x2APIC,另外除了CPUID还要用到带有EAX=0xb的EDX寄存器。程序启动时,会构建(通过CPU亲和力系统调用) APICID到CPU索引的映射,该映射在处理器的整个生命周期中静态存在。由于CPUID指令的开销可能相当昂贵,goroutine将只在其运行的内核中定期地更新状态结果。频繁更新可以减少内核锁阻塞,但同时也会导致花在加锁过程中的CPUID指令时间增加。
陈旧的CPU信息。如果加上锁运行goroutine的CPU信息可能会是过时的(goroutine会转移到另一个核心)。在 reader记住哪个是上锁的前提下,这只会影响性能,而不会影响准确性,当然,这样的转移也是不太可能的,就像操作系统内核尝试在同一个核心保持线程来改进缓存命中率一样。
性能
这个模式的性能特征会被大量的参数所影响。特别是CPUID 检测频率,readers 的数量,readers 和writers 的比率,还有readers 持有锁的时间,这些因素都非常重要。当在这个时间有且仅有一个writer 活跃的时候,这个writer 持有锁的时期不会影响sync.RWMutex 和DRWMutex 之间的性能差异。
实验证明DRWMutex表现胜过多核系统,特别writer小于1%的时候,CPUID会在最多每10个锁之间被调用(这种变化取决于锁被持有的持续时间)。甚至在少核的情况下,DRWMutex也在普遍选择通过sync.Mutex使用sync.RWMutex的应用程序的情况下表现好过sync.RWMutex.
下图显示核数量使用增加每10个的平均性能:
drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100
DRWMutex and sync.RWMutex performance comparison
错误条表示第25和第75个百分位。注意每第10核的下降;这是因为10个核组成一个运行标准检查系统的机器上的NUMA节点, 所以一旦增加一个NUMA节点,跨线程通信量变得更加宝贵。对于DRWMutex来说,由于对比sync.RWMutex更多的reader能够并行工作,所以性能也随之提升。
查看go-nuts tread 进一步讨论
cpu_amd64.s
#include
"textflag.h"//
func cpu() uint64TEXT
路cpu(SB),NOSPLIT,0ドル-8MOVL
0ドルx01, AX //
version informationMOVL
0ドルx00, BX //
any leaf will doMOVL
0ドルx00, CX //
any subleaf will do//
call CPUIDBYTE 0ドルx0fBYTE 0ドルxa2SHRQ
24,ドル BX //
logical cpu id is put in EBX[31-24]MOVQ
BX, ret+0(FP)RETmain.go
package
mainimport
("flag""fmt""math/rand""os""runtime""runtime/pprof""sync""syscall""time""unsafe")func
cpu() uint64 //
implemented in cpu_amd64.svar
cpus map[uint64] int//
determine mapping from APIC ID to CPU index by pinning the entire process to//
one core at the time, and seeing that its APIC ID is.func
init() {cpus
= make(map[uint64] int )var
aff uint64syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY,
uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))n
:= 0start
:= time .Now()var
mask uint64 = 1Outer:for {for (aff
& mask) == 0 {mask
<<= 1if mask
== 0 || mask > aff {break Outer}}ret,
_, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask)))if ret
!= 0 {panic(err.Error())}//
what CPU do we have?<- time .After(1
* time .Millisecond)c
:= cpu()if oldn,
ok := cpus[c]; ok {fmt.Println( "cpu" ,
n, "==" ,
oldn, "--
both have CPUID" ,
c)}cpus[c]
= nmask
<<= 1n++}fmt.Printf( "%d/%d
cpus found in %v: %v\n" ,
len(cpus), runtime.NumCPU(), time .Now().Sub(start),
cpus)ret,
_, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))if ret
!= 0 {panic(err.Error())}}type
RWMutex2 []sync.RWMutexfunc
(mx RWMutex2) Lock() {for core
:= range mx {mx[core].Lock()}}func
(mx RWMutex2) Unlock() {for core
:= range mx {mx[core].Unlock()}}func
main() {cpuprofile
:= flag.Bool( "cpuprofile" , false , "enable
CPU profiling" )locks
:= flag.Uint64( "i" ,
10000, "Number
of iterations to perform" )write
:= flag.Float64( "p" ,
0.0001, "Probability
of write locks" )wwork
:= flag.Int( "w" ,
1, "Amount
of work for each writer" )rwork
:= flag.Int( "r" ,
100, "Amount
of work for each reader" )readers
:= flag.Int( "n" ,
runtime.GOMAXPROCS(0), "Total
number of readers" )checkcpu
:= flag.Uint64( "c" ,
100, "Update
CPU estimate every n iterations" )flag.Parse()var
o *os.Fileif *cpuprofile
{o,
_ := os.Create( "rw.out" )pprof.StartCPUProfile(o)}readers_per_core
:= *readers / runtime.GOMAXPROCS(0)var
wg sync.WaitGroupvar
mx1 sync.RWMutexstart1
:= time .Now()for n
:= 0; n < runtime.GOMAXPROCS(0); n++ {for r
:= 0; r < readers_per_core; r++ {wg.Add(1)go
func() {defer
wg.Done()r
:= rand .New( rand .NewSource( rand .Int63()))for n
:= uint64(0); n < *locks; n++ {if r.Float64()
< *write {mx1.Lock()x
:= 0for i
:= 0; i < *wwork; i++ {x++}_
= xmx1.Unlock()} else {mx1.RLock()x
:= 0for i
:= 0; i < *rwork; i++ {x++}_
= xmx1.RUnlock()}}}()}}wg.Wait()end1
:= time .Now()t1
:= end1.Sub(start1)fmt.Println( "mx1" ,
runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1)if *cpuprofile
{pprof.StopCPUProfile()o.Close()o,
_ = os.Create( "rw2.out" )pprof.StartCPUProfile(o)}mx2
:= make(RWMutex2, len(cpus))start2
:= time .Now()for n
:= 0; n < runtime.GOMAXPROCS(0); n++ {for r
:= 0; r < readers_per_core; r++ {wg.Add(1)go
func() {defer
wg.Done()c
:= cpus[cpu()]r
:= rand .New( rand .NewSource( rand .Int63()))for n
:= uint64(0); n < *locks; n++ {if *checkcpu
!= 0 && n%*checkcpu == 0 {c
= cpus[cpu()]}if r.Float64()
< *write {mx2.Lock()x
:= 0for i
:= 0; i < *wwork; i++ {x++}_
= xmx2.Unlock()} else {mx2[c].RLock()x
:= 0for i
:= 0; i < *rwork; i++ {x++}_
= xmx2[c].RUnlock()}}}()}}wg.Wait()end2
:= time .Now()pprof.StopCPUProfile()o.Close()t2
:= end2.Sub(start2)fmt.Println( "mx2" ,
runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2)}
本文转自:开源中国社区[http://www.oschina.net]
本文标题:Go语言的分布式读写互斥
本文地址:http://www.oschina.net/translate/distributed-read-write-
mutex-in-go
参与翻译:BuN_Ny , OSC技术周刊 , eason02
英文原文:Distributed Read-Write Mutex in Go