分享
  1. 首页
  2. 文章

Go学习之Channel的一些模式

tyloafer · · 1308 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

除了在goroutine之间安全的传递数据之外,在看了《Concurrency in Go》之后,感慨channel还有那么多模式可供使用,在个人的学习中总结了以下几种常用的模式

pipeline

概念

我们以爬虫为例,一般爬虫分为如下步骤:

抓取页面 -> 解析页面 -> 整合数据分析 -> 分析结果入库

如果你把上面所有的步骤都放在一个函数里面处理,那会是多难看,多难以维护,从解耦角度考虑,我们可以起四个进程,分别承担不同的角色,例如,进程1负责抓取页面, 进程2负责解析页面,等等,各个进程拿到一个数据后,交给下一个进程来处理,这就是pipeline的基本思想,每个角色只负责关心自己的东西

示例

给定一个数n,执行 (n2 + 1) 2的操作

func pipeline() {
 generator := func(done chan interface{}, intergers ...int) <-chan int {
 inStream := make(chan int)
 go func() {
 defer close(inStream)
 for _, i := range intergers {
 select {
 case <-done:
 return
 case inStream <- i:
 }
 }
 }()
 return inStream
 }
 add := func(done <-chan interface{}, inStream <-chan int, increment int) <-chan int {
 addInStream := make(chan int)
 go func() {
 defer close(addInStream)
 for i := range inStream {
 select {
 case <-done:
 return
 case addInStream <- i + increment:
 }
 }
 }()
 return addInStream
 }
 multiply := func(done <-chan interface{}, inStream <-chan int, increment int) <-chan int {
 multiplyInStream := make(chan int)
 go func() {
 defer close(multiplyInStream)
 for i := range inStream {
 select {
 case <-done:
 return
 case multiplyInStream <- i * increment:
 }
 }
 }()
 return multiplyInStream
 }
 done := make(chan interface{})
 defer close(done)
 inStream := generator(done, []int{1, 2, 3, 4, 5, 6, 7}...)
 pipeline := multiply(done, add(done, multiply(done, inStream, 2), 1), 2)
 for v := range pipeline {
 fmt.Println(v)
 }
}

扇入扇出

在pipeline模型中,是一种高效的流式处理,但是假如pipeline中有a,b,c三个环节,b环节处理的特别慢,这时候就会影响到c环节的处理,如果增加b环节进程处理的数量,也就可以减弱b环节的慢处理对整个pipeline的影响,那么a->多个b的过程就是 扇入, 多个b环节输出数据到c环节,就是扇出

示例

func FanInFanOut() {
 producer := func(intergers ...int) <-chan interface{} {
 inStream := make(chan interface{})
 go func() {
 defer close(inStream)
 for _, v := range intergers {
 time.Sleep(5 * time.Second)
 inStream <- v
 }
 }()
 return inStream
 }
 fanIn := func(channels ...<-chan interface{},
 ) <-chan interface{} {
 var wg sync.WaitGroup
 multiplexStream := make(chan interface{})
 multiplex := func(c <-chan interface{}) {
 defer wg.Done()
 for i := range c {
 multiplexStream <- i
 }
 }
 wg.Add(len(channels))
 for _, c := range channels {
 go multiplex(c)
 }
 go func() {
 wg.Wait()
 close(multiplexStream)
 }()
 return multiplexStream
 }
 consumer := func(inStream <-chan interface{}) {
 for v := range inStream {
 fmt.Println(v)
 }
 }
 nums := runtime.NumCPU()
 producerStreams := make([]<-chan interface{}, nums)
 for i := 0; i < nums; i++ {
 producerStreams[i] = producer(i)
 }
 consumer(fanIn(producerStreams...))
}

tee- channel

概念

假如你从channel中拿到了一条sql语句,这时候,你想对这条sql记录,分析并执行,那你就需要将这条sql分别转发给这三个任务对应的channel,tee-channel 就是做这个事情的

示例

func teeChannel() {
 producer := func(intergers ...int) <-chan interface{} {
 inStream := make(chan interface{})
 go func() {
 defer close(inStream)
 for _, v := range intergers {
 inStream <- v
 }
 }()
 return inStream
 }
 tee := func(in <-chan interface{}) (_, _ <-chan interface{}) {
 out1 := make(chan interface{})
 out2 := make(chan interface{})
 go func() {
 defer close(out1)
 defer close(out2)
 for val := range in {
 out1, out2 := out1, out2
 for i := 0; i < 2; i++ {
 select {
 case out1 <- val:
 out1 = nil
 case out2 <- val:
 out2 = nil
 }
 }
 }
 }()
 return out1, out2
 }
 out1, out2 := tee(producer(1, 2, 3, 4, 5))
 for val1 := range out1 {
 fmt.Printf("out1: %v, out2: %v", val1, <-out2)
 }
}

桥接channel

概念

无论是前面提到的pipeline还是扇入扇出,每个goroutine都是对一个channel进行消费,但是实际场景中,可能会有多个channel来供给我们消费,而作为消费者,我们不关心这些值是来自于哪个channel,这种情况下,处理一个充满channel的channel可能会很多。如果我们定义一个功能,可以将充满channel的channel拆解为一个简单的channel,这将使消费者更专注于手头的工作,这就是桥接channel的思想

示例

func bridge() {
 gen := func() <-chan <-chan interface{} {
 in := make(chan (<-chan interface{}))
 go func() {
 defer close(in)
 for i := 0; i < 10; i++ {
 stream := make(chan interface{}, 1)
 stream <- i
 close(stream)
 in <- stream
 }
 }()
 return in
 }
 bridge := func(in <-chan (<-chan interface{})) <-chan interface{} {
 valStream := make(chan interface{})
 go func() {
 defer close(valStream)
 for {
 stream := make(<-chan interface{})
 select {
 case maybeStream, ok := <-in:
 if ok == false {
 return
 }
 stream = maybeStream
 }
 for val := range stream {
 valStream <- val
 }
 }
 }()
 return valStream
 }
 for val := range bridge(gen()) {
 fmt.Println(val)
 }
}

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:tyloafer

查看原文:Go学习之Channel的一些模式

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1308 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏