分享
  1. 首页
  2. 文章

goroutine、channel原理

强某某 · · 1182 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

goroutine原理

概念介绍

  1. 并发

一个CPU上能同时执行多项任务,在很短时间内,CPU来回切换任务执行(在某段很短时间内执行程序a,然后又迅速得切换到程序b去执行),有时间上的重叠(宏观上是同时的,微观仍是顺序执行),这样看起来多个任务像是同时执行,这就是并发。

  1. 并行

当系统有多个CPU时,每个CPU同一时刻都运行任务,互不抢占自己所在的CPU资源,同时进行,称为并行。

  1. 进程

CPU在切换程序的时候,如果不保存上一个程序的状态(context--上下文),直接切换下一个程序,就会丢失上一个程序的一系列状态,于是引入了进程这个概念,用以划分好程序运行时所需要的资源。因此进程就是一个程序运行时候的所需要的基本资源单位(也可以说是程序运行的一个实体)。

  1. 线程

CPU切换多个进程的时候,会花费不少的时间,因为切换进程需要切换到内核态,而每次调度需要内核态都需要读取用户态的数据,进程一旦多起来,CPU调度会消耗一大堆资源,因此引入了线程的概念,线程本身几乎不占有资源,他们共享进程里的资源,内核调度起来不会那么像进程切换那么耗费资源。

  1. 协程

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作执行者则是用户自身程序,goroutine也是协程。

Go并发模型

Go语言的并发处理参考了CSP(Communicating Sequential Process)模型。

CSP并发模型是在1970年左右提出的概念,属于比较新的概念,不同于传统的多线程通过共享内存来通信,CSP讲究的是"以通信的方式来共享内存"。

Go的CSP模型实现与原始的CSP实现有点差别:原始的CSP中channel里的任务都是立即执行的,而go语言为其增加了一个缓存,即任务可以先暂存起来,等待执行线程准备好再顺序执行。

Go的CSP并发模型,是通过goroutine和channel来实现的。

  • goroutine 是Go语言中并发的执行单位。有点抽象,其实就是和传统概念上的"线程"类似,可以理解为"线程"。
  • channel是Go语言中各个并发结构体(goroutine)之前的通信机制。通俗的讲,就是各个goroutine之间通信的"管道",有点类似于Linux中的管道。

生成一个goroutine的方式非常的简单:Go一下,就生成了。

go f()

通信机制channel也很方便,传数据用channel <- data,取数据用<-channel。
在通信过程中,传数据channel <- data和取数据<-channel必然会成对出现,因为这边传,那边
取,两个goroutine之间才会实现通信。
而且不管传还是取,必阻塞,直到另外的goroutine传或者取为止。

Go调度器GMP

Go语言运行时环境提供了非常强大的管理goroutine和系统内核线程的调度器, 内部提供了三种对象:Goroutine,Machine,Processor

Goroutine : 指应用创建的goroutine

Machine : 指系统内核线程。

Processor : 指承载多个goroutine的运行器

在宏观上说,Goroutine与Machine因为Processor的存在,形成了多对多(M:N)的关系。M个用户线程对应N个系统线程,缺点增加了调度器的实现难度

Goroutine是Go语言中并发的执行单位。Goroutine底层是使用协程(coroutine)实现,coroutine是一种运行在用户态的用户线程(参考操作系统原理:内核态,用户态)它可以由语言和框架层调度。Go在语言层面实现了调度器,同时对网络,IO库进行了封装处理,屏蔽了操作系统层面的复杂的细节,在语言层面提供统一的关键字支持。

三者与内核级线程的关系如下所示:


1.jpg

一个Machine会对应一个内核线程(K),同时会有一个Processor与它绑定。一个Processor连接一个或者多个Goroutine。Processor有一个运行时的Goroutine(上图中绿色的G),其它的Goroutine处于等待状态。

Processor的数量同时可以并发任务的数量,可通过GOMAXPROCS限制同时执行用户级任务的操作系统线程。GOMAXPROCS值默认是CPU的可用核心数,但是其数量是可以指定的。在go语言运行时环境,可以使用

runtime.GOMAXPROCS(MaxProcs)

来指定Processor数量。

默认数量为

func schedinit() {
 //设置最大的M数量
 sched.maxmcount = 10000
}
  • 当一个Goroutine创建被创建时,Goroutine对象被压入Processor的本地队列或者Go运行时全局Goroutine队列。
  • Processor唤醒一个Machine,如果Machine的waiting队列没有等待被 唤醒的Machine,则创建一个(只要不超过Machine的最大值,10000),Processor获取到Machine后,与此Machine绑定,并执行此Goroutine。
  • Machine执行过程中,随时会发生上下文切换。当发生上下文切换时,需要对执行现场进行保护,以便下次被调度执行时进行现场恢复。Go调度器中Machine的栈保存在Goroutine对象上,只需要将Machine所需要的寄存器(堆栈指针、程序计数器等)保存到Goroutine对象上即可。
  • 如果此时Goroutine任务还没有执行完,Machine可以将Goroutine重新压入Processor的队列,等待下一次被调度执行。
  • 如果执行过程遇到阻塞并阻塞超时,Machine会与Processor分离,并等待阻塞结束。此时Processor可以继续唤醒Machine执行其它的Goroutine,当阻塞结束时,Machine会尝试"偷取"一个Processor,如果失败,这个Goroutine会被加入到全局队列中,然后Machine将自己转入Waiting队列,等待被再次唤醒。

channel原理

channel数据结构

channel一个类型管道,通过它可以在goroutine之间发送和接收消息。它是Golang在语言层面提供的goroutine间的通信方式。

Go依赖于称为CSP(Communicating Sequential Processes)的并发模型,通过
Channel实现这种同步模式。

通过channel来实现通信:

package main
import (
 "fmt"
 "time"
)
func goRoutineA(a <-chan int) {
 val := <-a
 fmt.Println("goRoutineA:", val) }
func goRoutineB(b chan int) {
 val := <-b
 fmt.Println("goRoutineB:", val) }
 func main() {
 ch := make(chan int, 3)
 go goRoutineA(ch)
 go goRoutineB(ch)
 ch <- 3
 time.Sleep(time.Second) 
 }

channel结构体:

//path:src/runtime/chan.go
type hchan struct {
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列⻓度,即可以存放的元素个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标识关闭状态
elemtype *_type // 元素类型
sendx uint // 队列下标,指示元素写入时存放到队列中的位置
recvx uint // 队列下标,指示元素从队列的该位置读出
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lock mutex // 互斥锁,chan不允许并发读写
}

channel实现方式

chan内部实现了一个环形队列作为其缓冲区,队列的⻓度是创建chan时指定的。

下面展示了一个可缓存6个元素的channel示意图:


2.png
  • dataqsiz指示了队列⻓度为6,即可缓存6个元素
  • buf指向队列的内存,队列中还剩余两个元素
  • qcount表示队列中还有两个元素
  • sendx指示后续写入的数据存储的位置,取值[0, 6]
  • recvx指示从该位置读取数据, 取值[0, 6]

等待队列

从channel读数据,如果channel缓冲区为空或者没有缓冲区,当前goroutine会被阻塞。 向channel写数据,如果channel缓冲区已满或者没有缓冲区,当前goroutine会被阻塞。

被阻塞的goroutine将会挂在channel的等待队列中:

  • 因读阻塞的goroutine会被向channel写入数据的goroutine唤醒;
  • 因写阻塞的goroutine会被从channel读数据的goroutine唤醒;

以下展示了一个没有缓冲区的channel,有几个goroutine阻塞等待读数据:


3.png

注意,一般情况下recvq和sendq至少有一个为空。只有一个例外,那就是同一个goroutine使用select语句向channel一边写数据,一边读数据。

channel读写

创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区⻓度由make语句传入,buf的大小则与元素大小和缓冲区⻓度共同决定。

func makechan(t *chantype, size int) *hchan {
 elem := t.elem
 // compiler checks this but be safe.
 if elem.size >= 1<<16 {
 throw("makechan: invalid channel element type")
 }
 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
 throw("makechan: bad alignment")
 }
 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
 if overflow || mem > maxAlloc-hchanSize || size < 0 {
 panic(plainError("makechan: size out of range"))
 }
 // Hchan does not contain pointers interesting for GC when elements
stored in buf do not contain pointers.
 // buf points into the same allocation, elemtype is persistent.
 // SudoG's are referenced from their owning thread so they can't be
collected.
 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
 var c *hchan
 switch {
 case mem == 0:
 // Queue or element size is zero.
 c = (*hchan)(mallocgc(hchanSize, nil, true))
 // Race detector uses this location for synchronization.
 c.buf = c.raceaddr()
 case elem.ptrdata == 0:
 // Elements do not contain pointers.
 // Allocate hchan and buf in one call.
 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
 c.buf = add(unsafe.Pointer(c), hchanSize)
 default:
 // Elements contain pointers.
 c = new(hchan)
 c.buf = mallocgc(mem, elem, true)
 }
 c.elemsize = uint16(elem.size)
 c.elemtype = elem
 c.dataqsiz = uint(size)
 if debugChan {
 print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=",
elem.alg, "; dataqsiz=", size, "\n")
 }
 return c
 }
向channel写数据

向一个channel中写数据简单过程如下:

  1. 如果等待接收队列recvq不为空,说明缓冲区中没有数据或者没有缓冲区,此时直接从recvq
    取出G,并把数据写入,最后把该G唤醒,结束发送过程;
  2. 如果缓冲区中有空余位置,将数据写入缓冲区,结束发送过程;
  3. 如果缓冲区中没有空余位置,将待发送数据写入G,将当前G加入sendq,进入睡眠,等待被读goroutine唤醒;
4.png

从channel读数据

从一个channel读数据简单过程如下:

  1. 如果等待发送队列sendq不为空,且没有缓冲区,直接从sendq中取出G,把G中数据读出,
    最后把G唤醒,结束读取过程;
  2. 如果等待发送队列sendq不为空,此时说明缓冲区已满,从缓冲区中首部读出数据,把G中
    数据写入缓冲区尾部,把G唤醒,结束读取过程;
  3. 如果缓冲区中有数据,则从缓冲区取出数据,结束读取过程;
  4. 将当前goroutine加入recvq,进入睡眠,等待被写goroutine唤醒;
5.png

关闭channel

关闭channel时会把recvq中的G全部唤醒,本该写入G的数据位置为nil。把sendq中的G全部唤醒,但这些G会panic。

func closechan(c *hchan) {
 if c == nil {
 panic(plainError("close of nil channel"))
 }
 lock(&c.lock)
 if c.closed != 0 {
 unlock(&c.lock)
 panic(plainError("close of closed channel"))
 }
 if raceenabled {
 callerpc := getcallerpc()
 racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
 racerelease(c.raceaddr())
 }
 c.closed = 1
 var glist gList
 // 释放所有接收者
 for {
 sg := c.recvq.dequeue()
 if sg == nil {
 break
 }
 if sg.elem != nil {
 typedmemclr(c.elemtype, sg.elem)
 sg.elem = nil
 }
 if sg.releasetime != 0 {
 sg.releasetime = cputicks()
 }
 gp := sg.g
 gp.param = nil
 if raceenabled {
 raceacquireg(gp, c.raceaddr())
 }
 glist.push(gp)
 }
 // 释放所有发送者
 for {
 sg := c.sendq.dequeue()
 if sg == nil {
 break
 }
 sg.elem = nil
 if sg.releasetime != 0 {
 sg.releasetime = cputicks()
 }
 gp := sg.g
 gp.param = nil
 if raceenabled {
 raceacquireg(gp, c.raceaddr())
 }
 glist.push(gp)
 }
 unlock(&c.lock)
 // 垃圾回收
 for !glist.empty() {
 gp := glist.pop()
 gp.schedlink = 0
 goready(gp, 3)
 } }

除此之外,panic出现的常⻅场景还有:

  1. 关闭值为nil的channel
  2. 关闭已经被关闭的channel
  3. 向已经关闭的channel写数据

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:强某某

查看原文:goroutine、channel原理

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1182 次点击
被以下专栏收入,发现更多相似内容
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏