分享
  1. 首页
  2. 文章

golang并发编程——goroutine使用指南

skh2015java · · 2078 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

并发是golang最有核心竞争力的功能,golang的并发依赖的并不是线程,而是协程。协程和线程有什么区别呢?最大的区别就是协程比线程更为轻量。默认情况中一个进程最大可以启动254个线程,这个数值也可以改为无限制,但主机资源消耗就会非常严重。而使用协程就不同了,一个进程可以轻轻松松启动上万个协程而毫无压力。

因此本篇文章就来说说在golang中如何创建使用协程。

golang设计协程的目的,一方面是为了提高并发效率,另外一方面就是尽可能发挥多核CPU的能力。golang内置的调度器,可以让多核CPU中每个CPU执行一个协程。通过这样的设计,把每个CPU都充分调动起来,减少CPU空闲时间,提高了CPU吞吐量,无形当中也提高了I/O效率。

提到golang的协程,就不得不提到一个名词:管道(pipeline)。这里的管道和Linux系统中的pipe不是同一个意思,这里的管道指的是使用channel将多个处理步骤相连,形成的具有多级channel的数据流。一般来说,管道都是通过流入口读取数据,从流出口发送数据,读取数据之后都会调用某些函数来处理这些数据。

管道中的每一级都可以拥有多个流入口和流出口,但管道的首级和末级一般情况只有一个流入口或者流出口。拥有流出口的首级一般称之为数据源或者生产者,拥有流入口的末级一般称之为终点或者消费者。

这些技术解释,看上去很枯燥,我们通过一些简单的实例逐渐深入讲解。首先看下面的实例。在这个实例中,共分有三步来处理数据,第一步gen函数负责将传入的数据放到channel之中,当数据传完之后,关闭channel。代码如下:

func gen(nums ...int) <-chan int {
 out := make(chan int)
 go func() {
 for _, n := range nums {
 out <- n
 }
 close(out)
 }()
 return out
}

第二步,sq函数从channel中读取数据,并对每个数值进行相乘运算,然后再将运算后的数据发送到下一个channel当中。代码如下:

func sq(in <-chan int) <-chan int {
 out := make(chan int)
 go func() {
 for n := range in {
 out <- n * n
 }
 close(out)
 }()
 return out
}

最后一步,就是main函数了。main函数接受二阶段中发送的数据,然后输出这些数据知道channel关闭。代码如下:

func main() {
 // Set up the pipeline.
 c := gen(2, 3)
 out := sq(c)
 // Consume the output.
 fmt.Println(<-out) // 4
 fmt.Println(<-out) // 9
}

因为sq函数的参数类型和返回类型一致,所以sq函数可以合并处理,修改后的代码如下:

func main() {
 // Set up the pipeline and consume the output.
 for n := range sq(sq(gen(2, 3))) {
 fmt.Println(n) // 16 then 81
 }
}

到这里,以上三步就完成了一个非常基本的golang并发模型。但还存在很多缺陷,我们继续对它进行优化。首先第一步,就是将每一步处理单个channel,改为处理多个channel。

在golang并发模型中,存在两个概念:Fan-in(扇入)和Fan-out(扇出)。扇入指的是一个程序可以同时从多个channel中读取数据并且对其进行处理,直到收到明确的停止信号或者所有的channel被关闭。
扇出指的是多个程序可以同时从一个channel中读取数据并且对其进行处理,直到channel关闭。扇出值越大,CPU利用率越高,IO使用率也就越高。

下面的优化,就是针对扇入和扇出入手的。

我们将调用一次sq函数变为调用两次sq函数,同时引入一个merge函数来扇入处理结果数据。

func main() {
 in := gen(2, 3)
 // Distribute the sq work across two goroutines that both read from in.
 c1 := sq(in)
 c2 := sq(in)
 // Consume the merged output from c1 and c2.
 for n := range merge(c1, c2) {
 fmt.Println(n) // 4 then 9, or 9 then 4
 }
}

merge函数会通过启动一个协程将多个channel中的数据合并到一个channel之中。Golang语言中,向一个已经关闭的channel中发送数据会引发一个运行时异常,所以有必要在发送数据之前需要确保channel未被关闭。这里,我们使用sync.WaitGroup 做同步,只有数据发送完了,才会关闭channel。

func merge(cs ...<-chan int) <-chan int {
 var wg sync.WaitGroup
 out := make(chan int)
 // Start an output goroutine for each input channel in cs. output
 // copies values from c to out until c is closed, then calls wg.Done.
 output := func(c <-chan int) {
 for n := range c {
 out <- n
 }
 wg.Done()
 }
 wg.Add(len(cs))
 for _, c := range cs {
 go output(c)
 }
 // Start a goroutine to close out once all the output goroutines are
 // done. This must start after the wg.Add call.
 go func() {
 wg.Wait()
 close(out)
 }()
 return out
}

现在我们已经有了一个这样的模型:

  • 只有所有的数据都发送完成之后,才会关闭channel
  • 其它协程会一直接受数据,直到所有channel被关闭。

通过这个模型,我们可以循环接受并且处理数据。但我们的脚步不会就此停止,让我们继续往下优化。

目前所有的协程都是独立运行的,负责发送的协程可以不停的发送数据,接受数据的协程会不停的接受数据。那如果接受数据的协程不再需要这些数据了,那么又该如何通知上游的协程呢?

在上面的示例中,如果某一个阶段发生异常而退出,那么其他协程无法获知这个事件,就会发生一些资源泄漏。

 // Consume the first value from output.
 out := merge(c1, c2)
 fmt.Println(<-out) // 4 or 9
 return
 // Since we didn't receive the second value from out,
 // one of the output goroutines is hung attempting to send it.
}

所以下一步优化的方向就是协程之间的协同工作。先拿channel开刀,因为channel是可以带缓冲的。所以我们声明一个带有缓冲的channel:

c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1

channel的buffer是2,所以一次只能放入两个值,只有这两个值被处理了之后才能继续往里面放入新值。

这样,我们就可以修改一个gen函数。

func gen(nums ...int) <-chan int {
 out := make(chan int, len(nums))
 for _, n := range nums {
 out <- n
 }
 close(out)
 return out
}

回到merge函数中,我们也可以考虑在merge函数中使用一个带有缓冲的channel:

func merge(cs ...<-chan int) <-chan int {
 var wg sync.WaitGroup
 out := make(chan int, 1) // enough space for the unread inputs
 // ... the rest is unchanged ...

直接声明一个buffer=1的channel,不是一个好主意。因为目前这个值是已知的,但以后如果发生变化,那么还要修改代码,所以最好写成通用代码。但现在先这样用着吧。

这些貌似和协同工作,没关系。那下面就是有关系的代码了,加入main函数中准备要退出了,也就是不再接受数据了。main函数需要通知上游的协程停止发送数据,main函数如何做到这点呢?

main函数使用另外一个channel来完成这件事情,当需要退出时,main就通过done这个新增的channel发送消息,如下:

func main() {
 in := gen(2, 3)
 // Distribute the sq work across two goroutines that both read from in.
 c1 := sq(in)
 c2 := sq(in)
 // Consume the first value from output.
 done := make(chan struct{}, 2)
 out := merge(done, c1, c2)
 fmt.Println(<-out) // 4 or 9
 // Tell the remaining senders we're leaving.
 done <- struct{}{}
 done <- struct{}{}
}

main给done发送了一个空的结构体,但这个没有关系,我们关心的是done里面是否有值,而不是有什么值。其它协程如果需要接受信号,那么就需要使用select来处理done。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
 var wg sync.WaitGroup
 out := make(chan int)
 // Start an output goroutine for each input channel in cs. output
 // copies values from c to out until c is closed or it receives a value
 // from done, then output calls wg.Done.
 output := func(c <-chan int) {
 for n := range c {
 select {
 case out <- n:
 case <-done:
 }
 }
 wg.Done()
 }
 // ... the rest is unchanged ...

这种方法虽然可以实现通知的目的,但还有问题:main函数需要明确知道一共有多少个协程需要通知到,因此done <- struct{}{}需要不停的调用,直到所有的协程都被通知到位。如果有一些协程没有被通知到,呵呵,等着看异常吧。

为了解决这个问题,我们通过关闭done的方式来通知所有的协程。因为从一个已经关闭的channel中接受数据,会使当前协程立即退出。所以main函数中关闭了done,那么所有等待着从done接受关闭信号的协程们,会老老实实的自动退出。

func main() {
 // Set up a done channel that's shared by the whole pipeline,
 // and close that channel when this pipeline exits, as a signal
 // for all the goroutines we started to exit.
 done := make(chan struct{})
 defer close(done)
 in := gen(done, 2, 3)
 // Distribute the sq work across two goroutines that both read from in.
 c1 := sq(done, in)
 c2 := sq(done, in)
 // Consume the first value from output.
 out := merge(done, c1, c2)
 fmt.Println(<-out) // 4 or 9
 // done will be closed by the deferred call.
}

这样,merge函数就可以明确得知其下游已经不再需要处理数据了,merge就可以放心退出了。而sq也可以通过得知done已经关闭,而不再向外发送数据了。但这些函数再退出之时都会调用wg.Done来告诉main,它们都已经合法退出。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
 var wg sync.WaitGroup
 out := make(chan int)
 // Start an output goroutine for each input channel in cs. output
 // copies values from c to out until c or done is closed, then calls
 // wg.Done.
 output := func(c <-chan int) {
 defer wg.Done()
 for n := range c {
 select {
 case out <- n:
 case <-done:
 return
 }
 }
 }
 // ... the rest is unchanged ...
func sq(done <-chan struct{}, in <-chan int) <-chan int {
 out := make(chan int)
 go func() {
 defer close(out)
 for n := range in {
 select {
 case out <- n * n:
 case <-done:
 return
 }
 }
 }()
 return out
}

到这里,才算是真正的完成了协程之间的协同工作。



原文地址:http://blog.csdn.net/vikings_1001


有疑问加站长微信联系(非本文作者)

本文来自:CSDN博客

感谢作者:skh2015java

查看原文:golang并发编程——goroutine使用指南

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
2078 次点击 ∙ 1 赞
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏