Golang并发模型
wiseAaron · · 7074 次点击 · · 开始浏览控制并发有三种种经典的方式,一种是通过channel通知实现并发控制 一种是WaitGroup,另外一种就是Context。
1. 使用最基本通过channel通知实现并发控制
无缓冲通道
无缓冲的通道指的是通道的大小为0,也就是说,这种类型的通道在接收前没有能力保存任何值,它要求发送 goroutine 和接收 goroutine 同时准备好,才可以完成发送和接收操作。
从上面无缓冲的通道定义来看,发送 goroutine 和接收 gouroutine 必须是同步的,同时准备后,如果没有同时准备好的话,先执行的操作就会阻塞等待,直到另一个相对应的操作准备好为止。这种无缓冲的通道我们也称之为同步通道。
正式通过无缓冲通道来实现多 goroutine 并发控制
func main() {
ch := make(chan instruct{})
go func() {
ch <- struct{}{}
}()
fmt.Println(<-ch)
}
当主 goroutine 运行到 <-ch 接受 channel 的值的时候,如果该 channel 中没有数据,就会一直阻塞等待,直到有值。 这样就可以简单实现并发控制
2. 通过sync包中的WaitGroup实现并发控制
在 sync 包中,提供了 WaitGroup ,它会等待它收集的所有 goroutine 任务全部完成,在主 goroutine 中 Add(delta int) 索要等待goroutine 的数量。在每一个 goroutine 完成后 Done() 表示这一个goroutine 已经完成,当所有的 goroutine 都完成后,在主 goroutine 中 WaitGroup 返回返回。
fun main(){
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
http.Get(url)
}(url)
}
wg.Wait()
}
但是在Golang官网中,有这么一句话
- A WaitGroup must not be copied after first use.
翻译够来过来就是,在 WaitGroup 第一次使用后,不能被拷贝,因为会出现一下问题
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(wg sync.WaitGroup, i int) {
log.Printf("i:%d", i)
wg.Done()
}(wg, i)
}
wg.Wait()
log.Println("exit")
}
运行结果如下
2009年11月10日 23:00:00 i:4
2009年11月10日 23:00:00 i:0
2009年11月10日 23:00:00 i:1
2009年11月10日 23:00:00 i:2
2009年11月10日 23:00:00 i:3
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x1040a13c, 0x44bc)
/usr/local/go/src/runtime/sema.go:47 +0x40
sync.(*WaitGroup).Wait(0x1040a130, 0x121460)
/usr/local/go/src/sync/waitgroup.go:131 +0x80
main.main()
/tmp/sandbox894380819/main.go:19 +0x120
它提示我所有的 goroutine 都已经睡眠了,出现了死锁。这是因为 wg 给拷贝传递到了 goroutine 中,导致只有 Add 操作,其实 Done操作是在 wg 的副本执行的。因此 Wait 就死锁了。
go 中四种引用类型有 slice, channel, function, map
改正方法一:
将匿名函数中wg的传入类型改为*sync.WaitGrou,这样就能引用到正确的WaitGroup了。改正方法二:
将匿名函数中的wg的传入参数去掉,因为Go支持闭包类型,在匿名函数中可以直接使用外面的wg变量
3. 在Go 1.7 以后引进的强大的Context上下文,实现并发控制
3.1 简介
在一些简单场景下使用 channel 和 WaitGroup 已经足够了,但是当面临一些复杂多变的网络并发场景下 channel 和 WaitGroup 显得有些力不从心了。比如一个网络请求 Request,每个 Request 都需要开启一个 goroutine 做一些事情,这些 goroutine 又可能会开启其他的 goroutine,比如数据库和RPC服务。所以我们需要一种可以跟踪 goroutine 的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的 Context,称之为上下文非常贴切,它就是goroutine 的上下文。它是包括一个程序的运行环境、现场和快照等。每个程序要运行时,都需要知道当前程序的运行状态,通常Go 将这些封装在一个 Context 里,再将它传给要执行的 goroutine 。context 包主要是用来处理多个 goroutine 之间共享数据,及多个 goroutine 的管理。
3.2 package context
context 包的核心是 struct Context,接口声明如下:
// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
// Done returns a channel that is closed when this `Context` is canceled
// or times out.
Done() <-chan struct{}
// Err indicates why this Context was canceled, after the Done channel
// is closed.
Err() error
// Deadline returns the time when this Context will be canceled, if any.
Deadline() (deadline time.Time, ok bool)
// Value returns the value associated with key or nil if none.
Value(key interface{}) interface{}
}
-
Done()返回一个只能接受数据的channel类型,当该context关闭或者超时时间到了的时候,该channel就会有一个取消信号 -
Err()在Done()之后,返回context取消的原因。 -
Deadline()设置该context cancel的时间点 -
Value()方法允许Context对象携带request作用域的数据,该数据必须是线程安全的。
Context 对象是线程安全的,你可以把一个 Context 对象传递给任意个数的 gorotuine,对它执行 取消 操作时,所有 goroutine 都会接收到取消信号。
一个 Context 不能拥有 Cancel 方法,同时我们也只能 Done channel 接收数据。
背后的原因是一致的:接收取消信号的函数和发送信号的函数通常不是一个。
一个典型的场景是:父操作为子操作操作启动 goroutine,子操作也就不能取消父操作。
3.3 继承 context
context 包提供了一些函数,协助用户从现有的 Context 对象创建新的 Context 对象。
这些 Context 对象形成一棵树:当一个 Context 对象被取消时,继承自它的所有 Context 都会被取消。
Background 是所有 Context 对象树的根,它不能被取消。它的声明如下:
// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level `Context` for incoming requests.
func Background() Context
WithCancel 和 WithTimeout 函数 会返回继承的 Context 对象, 这些对象可以比它们的父 Context 更早地取消。
当请求处理函数返回时,与该请求关联的 Context 会被取消。 当使用多个副本发送请求时,可以使用 WithCancel 取消多余的请求。 WithTimeout 在设置对后端服务器请求超时时间时非常有用。 下面是这三个函数的声明:
// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// A CancelFunc cancels a Context.
type CancelFunc func()
// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
WithValue 函数能够将请求作用域的数据与 Context 对象建立关系。声明如下:
// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context
3.4 context例子
当然,想要知道 Context 包是如何工作的,最好的方法是看一个例子。
func childFunc(cont context.Context, num *int) {
ctx, _ := context.WithCancel(cont)
for {
select {
case <-ctx.Done():
fmt.Println("child Done : ", ctx.Err())
return
}
}
}
func main() {
gen := func(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("parent Done : ", ctx.Err())
return // returning not to leak the goroutine
case dst <- n:
n++
go childFunc(ctx, &n)
}
}
}()
return dst
}
ctx, cancel := context.WithCancel(context.Background())
for n := range gen(ctx) {
fmt.Println(n)
if n >= 5 {
break
}
}
cancel()
time.Sleep(5 * time.Second)
}
在上面的例子中,主要描述的是通过一个channel实现一个为循环次数为5的循环,
在每一个循环中产生一个goroutine,每一个goroutine中都传入context,在每个goroutine中通过传入ctx创建一个子Context,并且通过select一直监控该Context的运行情况,当在父Context退出的时候,代码中并没有明显调用子Context的Cancel函数,但是分析结果,子Context还是被正确合理的关闭了,这是因为,所有基于这个Context或者衍生的子Context都会收到通知,这时就可以进行清理操作了,最终释放goroutine,这就优雅的解决了goroutine启动后不可控的问题。
下面是运行结果:
3.5 Context 使用原则
- 不要把
Context放在结构体中,要以参数的方式传递 - 以
Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。 - 给一个函数方法传递
Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO -
Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递 -
Context是线程安全的,可以放心的在多个goroutine中传递
有疑问加站长微信联系(非本文作者)
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889
关注微信- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码` - 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传
收入到我管理的专栏 新建专栏
控制并发有三种种经典的方式,一种是通过channel通知实现并发控制 一种是WaitGroup,另外一种就是Context。
1. 使用最基本通过channel通知实现并发控制
无缓冲通道
无缓冲的通道指的是通道的大小为0,也就是说,这种类型的通道在接收前没有能力保存任何值,它要求发送 goroutine 和接收 goroutine 同时准备好,才可以完成发送和接收操作。
从上面无缓冲的通道定义来看,发送 goroutine 和接收 gouroutine 必须是同步的,同时准备后,如果没有同时准备好的话,先执行的操作就会阻塞等待,直到另一个相对应的操作准备好为止。这种无缓冲的通道我们也称之为同步通道。
正式通过无缓冲通道来实现多 goroutine 并发控制
func main() {
ch := make(chan instruct{})
go func() {
ch <- struct{}{}
}()
fmt.Println(<-ch)
}
当主 goroutine 运行到 <-ch 接受 channel 的值的时候,如果该 channel 中没有数据,就会一直阻塞等待,直到有值。 这样就可以简单实现并发控制
2. 通过sync包中的WaitGroup实现并发控制
在 sync 包中,提供了 WaitGroup ,它会等待它收集的所有 goroutine 任务全部完成,在主 goroutine 中 Add(delta int) 索要等待goroutine 的数量。在每一个 goroutine 完成后 Done() 表示这一个goroutine 已经完成,当所有的 goroutine 都完成后,在主 goroutine 中 WaitGroup 返回返回。
fun main(){
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
http.Get(url)
}(url)
}
wg.Wait()
}
但是在Golang官网中,有这么一句话
- A WaitGroup must not be copied after first use.
翻译够来过来就是,在 WaitGroup 第一次使用后,不能被拷贝,因为会出现一下问题
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(wg sync.WaitGroup, i int) {
log.Printf("i:%d", i)
wg.Done()
}(wg, i)
}
wg.Wait()
log.Println("exit")
}
运行结果如下
2009年11月10日 23:00:00 i:4
2009年11月10日 23:00:00 i:0
2009年11月10日 23:00:00 i:1
2009年11月10日 23:00:00 i:2
2009年11月10日 23:00:00 i:3
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x1040a13c, 0x44bc)
/usr/local/go/src/runtime/sema.go:47 +0x40
sync.(*WaitGroup).Wait(0x1040a130, 0x121460)
/usr/local/go/src/sync/waitgroup.go:131 +0x80
main.main()
/tmp/sandbox894380819/main.go:19 +0x120
它提示我所有的 goroutine 都已经睡眠了,出现了死锁。这是因为 wg 给拷贝传递到了 goroutine 中,导致只有 Add 操作,其实 Done操作是在 wg 的副本执行的。因此 Wait 就死锁了。
go 中四种引用类型有 slice, channel, function, map
改正方法一:
将匿名函数中wg的传入类型改为*sync.WaitGrou,这样就能引用到正确的WaitGroup了。改正方法二:
将匿名函数中的wg的传入参数去掉,因为Go支持闭包类型,在匿名函数中可以直接使用外面的wg变量
3. 在Go 1.7 以后引进的强大的Context上下文,实现并发控制
3.1 简介
在一些简单场景下使用 channel 和 WaitGroup 已经足够了,但是当面临一些复杂多变的网络并发场景下 channel 和 WaitGroup 显得有些力不从心了。比如一个网络请求 Request,每个 Request 都需要开启一个 goroutine 做一些事情,这些 goroutine 又可能会开启其他的 goroutine,比如数据库和RPC服务。所以我们需要一种可以跟踪 goroutine 的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的 Context,称之为上下文非常贴切,它就是goroutine 的上下文。它是包括一个程序的运行环境、现场和快照等。每个程序要运行时,都需要知道当前程序的运行状态,通常Go 将这些封装在一个 Context 里,再将它传给要执行的 goroutine 。context 包主要是用来处理多个 goroutine 之间共享数据,及多个 goroutine 的管理。
3.2 package context
context 包的核心是 struct Context,接口声明如下:
// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
// Done returns a channel that is closed when this `Context` is canceled
// or times out.
Done() <-chan struct{}
// Err indicates why this Context was canceled, after the Done channel
// is closed.
Err() error
// Deadline returns the time when this Context will be canceled, if any.
Deadline() (deadline time.Time, ok bool)
// Value returns the value associated with key or nil if none.
Value(key interface{}) interface{}
}
-
Done()返回一个只能接受数据的channel类型,当该context关闭或者超时时间到了的时候,该channel就会有一个取消信号 -
Err()在Done()之后,返回context取消的原因。 -
Deadline()设置该context cancel的时间点 -
Value()方法允许Context对象携带request作用域的数据,该数据必须是线程安全的。
Context 对象是线程安全的,你可以把一个 Context 对象传递给任意个数的 gorotuine,对它执行 取消 操作时,所有 goroutine 都会接收到取消信号。
一个 Context 不能拥有 Cancel 方法,同时我们也只能 Done channel 接收数据。
背后的原因是一致的:接收取消信号的函数和发送信号的函数通常不是一个。
一个典型的场景是:父操作为子操作操作启动 goroutine,子操作也就不能取消父操作。
3.3 继承 context
context 包提供了一些函数,协助用户从现有的 Context 对象创建新的 Context 对象。
这些 Context 对象形成一棵树:当一个 Context 对象被取消时,继承自它的所有 Context 都会被取消。
Background 是所有 Context 对象树的根,它不能被取消。它的声明如下:
// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level `Context` for incoming requests.
func Background() Context
WithCancel 和 WithTimeout 函数 会返回继承的 Context 对象, 这些对象可以比它们的父 Context 更早地取消。
当请求处理函数返回时,与该请求关联的 Context 会被取消。 当使用多个副本发送请求时,可以使用 WithCancel 取消多余的请求。 WithTimeout 在设置对后端服务器请求超时时间时非常有用。 下面是这三个函数的声明:
// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// A CancelFunc cancels a Context.
type CancelFunc func()
// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
WithValue 函数能够将请求作用域的数据与 Context 对象建立关系。声明如下:
// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context
3.4 context例子
当然,想要知道 Context 包是如何工作的,最好的方法是看一个例子。
func childFunc(cont context.Context, num *int) {
ctx, _ := context.WithCancel(cont)
for {
select {
case <-ctx.Done():
fmt.Println("child Done : ", ctx.Err())
return
}
}
}
func main() {
gen := func(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("parent Done : ", ctx.Err())
return // returning not to leak the goroutine
case dst <- n:
n++
go childFunc(ctx, &n)
}
}
}()
return dst
}
ctx, cancel := context.WithCancel(context.Background())
for n := range gen(ctx) {
fmt.Println(n)
if n >= 5 {
break
}
}
cancel()
time.Sleep(5 * time.Second)
}
在上面的例子中,主要描述的是通过一个channel实现一个为循环次数为5的循环,
在每一个循环中产生一个goroutine,每一个goroutine中都传入context,在每个goroutine中通过传入ctx创建一个子Context,并且通过select一直监控该Context的运行情况,当在父Context退出的时候,代码中并没有明显调用子Context的Cancel函数,但是分析结果,子Context还是被正确合理的关闭了,这是因为,所有基于这个Context或者衍生的子Context都会收到通知,这时就可以进行清理操作了,最终释放goroutine,这就优雅的解决了goroutine启动后不可控的问题。
下面是运行结果:
3.5 Context 使用原则
- 不要把
Context放在结构体中,要以参数的方式传递 - 以
Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。 - 给一个函数方法传递
Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO -
Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递 -
Context是线程安全的,可以放心的在多个goroutine中传递