分享
  1. 首页
  2. 文章

golang 源码剖析(6): 通道

darcyaf · · 1343 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

简介(js)

通道(channel) 是Go实现CSP并发模型的关键, 鼓励用通信来实现数据共享。 Dont' communicate by sharing memory, share memory by communicating.
CSP: Communicating Sequential Process

创建

chan.go中 hchan的结构

type hchan struct {
 qcount uint // total data in the queue
 dataqsiz uint // size of the circular queue
 buf unsafe.Pointer // points to an array of dataqsiz elements
 elemsize uint16
 closed uint32
 elemtype *_type // element type
 sendx uint // send index
 recvx uint // receive index
 recvq waitq // list of recv waiters
 sendq waitq // list of send waiters
 // lock protects all fields in hchan, as well as several
 // fields in sudogs blocked on this channel.
 //
 // Do not change another G's status while holding this lock
 // (in particular, do not ready a G), as this can deadlock
 // with stack shrinking.
 lock mutex
}
type waitq struct {
 first *sudog
 last *sudog
}

makechan: 这里先做了一些元素大小,队列大小检查。受垃圾回收器的限制,如果包含指针类型,则缓冲槽需单独分配内存,否则可一次性分配,调整buf的指针,最后设置size等属性

func makechan(t *chantype, size int) *hchan {
 elem := t.elem
 // compiler checks this but be safe.
 if elem.size >= 1<<16 { //限制chan的元素大小
 throw("makechan: invalid channel element type")
 }
 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) //检查是否溢出
 if overflow || mem > maxAlloc-hchanSize || size < 0 {
 panic(plainError("makechan: size out of range"))
 }
 var c *hchan
 switch {
 case mem == 0:
 // Queue or element size is zero.
 c = (*hchan)(mallocgc(hchanSize, nil, true))
 // Race detector uses this location for synchronization.
 c.buf = c.raceaddr()
 case elem.ptrdata == 0:
 // Elements do not contain pointers.
 // Allocate hchan and buf in one call.
 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
 c.buf = add(unsafe.Pointer(c), hchanSize)
 default:
 // Elements contain pointers.
 c = new(hchan)
 c.buf = mallocgc(mem, elem, true)
 }

收发

这里用sudog用来保存收发队列,其中包含一个元素和g的指针,这里也实现了cache,central那一套缓存体系.
acquireSudog获取sudog和releaseSudog释放sudog, 大致流程也是先从本地p获取,接着再去sched.sudogcache中获取.

type sudog struct {
 g *g
 elem unsafe.Pointer // data element (may point to stack)
}
type p struct {
 sudogcache []*sudog
 sudogbuf [128]*sudog
}
type schedt struct {
 // Central cache of sudog structs.
 sudoglock mutex
 sudogcache *sudog
}

发送

在go1.13的源码中已经不判断c.dataqsiz==0, 也就是将缓冲长度的0的大于0的整合在一起了。
如果block=false: 如果通道为nil, 则直接返回false. 对于无缓冲的情况,如果没有接收者会直接return false。 如果有缓冲但是缓冲满了也会return false。

如果通道关闭了,会触发panic。
尝试等待队列c.recvq中有等待者的话, 就直接将数据复制到sg.elem(如果是带缓冲的则更新缓冲的index等参数),并唤醒对应的groutine。

如果没有等待者,并且缓冲队列能存下,则获取一个sudog之后将数据放入sendq并返回

如果缓冲队列存不下,则调用goparkunlock然当前goroutine休眠,直到被goready唤醒,然后释放当前的sudog

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
 chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) {
 if c == nil {
 if !block {
 return false
 }
 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
 throw("unreachable")
 }
 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
 (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
 return false
 }
 if c.closed != 0 {
 unlock(&c.lock)
 panic(plainError("send on closed channel"))
 }
 if sg := c.recvq.dequeue(); sg != nil {
 // Found a waiting receiver. We pass the value we want to send
 // directly to the receiver, bypassing the channel buffer (if any).
 send(c, sg, ep, func() { unlock(&c.lock) }, 3)
 return true
 }
 if c.qcount < c.dataqsiz {
 // Space is available in the channel buffer. Enqueue the element to send.
 qp := chanbuf(c, c.sendx)
 if raceenabled {
 raceacquire(qp)
 racerelease(qp)
 }
 typedmemmove(c.elemtype, qp, ep)
 c.sendx++
 if c.sendx == c.dataqsiz {
 c.sendx = 0
 }
 c.qcount++
 unlock(&c.lock)
 return true
 }
}

接收

接收类似,但是在通道关闭并且缓冲中无数据时,会返回一个默认值。
故而在通道关闭之后还是能获取到一个值. 但是此时的返回中received变成了false

注意: 可能是由于如果队列满的话,可以直接将那块地址的数据做swap,才将有数据分为队列满不满的两种.在看select的时候判断条件有点让人不好理解.
在recv函数中, sg是sender,go 在这边的处理流程是sg := c.sendq.dequeue(),先从sendq中取出一个,如果sg不为nil,则调用recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
在recv()中,如果c.dataqsiz>0,也就是带缓冲chan,将调用typedmemmove(c.elemtype, ep, qp) 把queue的数据复制给ep,然后调用typedmemmove(c.elemtype, qp, sg.elem)将sg.elem(也就是pop出来的sender)的数据复制给qp

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 if c.dataqsiz == 0 {
 } else {
 // Queue is full. Take the item at the
 // head of the queue. Make the sender enqueue
 // its item at the tail of the queue. Since the
 // queue is full, those are both the same slot.
 qp := chanbuf(c, c.recvx)
 if raceenabled {
 raceacquire(qp)
 racerelease(qp)
 raceacquireg(sg.g, qp)
 racereleaseg(sg.g, qp)
 }
 // copy data from queue to receiver
 if ep != nil {
 typedmemmove(c.elemtype, ep, qp)
 }
 // copy data from sender to queue
 typedmemmove(c.elemtype, qp, sg.elem)
 c.recvx++
 if c.recvx == c.dataqsiz {
 c.recvx = 0
 }
 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
 }
}
 if c.closed != 0 && c.qcount == 0 {
 if raceenabled {
 raceacquire(c.raceaddr())
 }
 unlock(&c.lock)
 if ep != nil {
 typedmemclr(c.elemtype, ep)
 }
 return true, false
 }

关闭

  • 关闭nil chan,或者重复关闭会 panic
  • 将c.closed 置为1
  • 循环pop c.recvq和c.sendq, 清空其数据,将gp.param置为nil, 最后都放入glist中
  • 轮训glist, 将所有接收和发送者都唤醒
  • 并未清理本身的buf
func closechan(c *hchan) {
 if c == nil {
 panic(plainError("close of nil channel"))
 }
 lock(&c.lock)
 if c.closed != 0 {
 unlock(&c.lock)
 panic(plainError("close of closed channel"))
 }
 c.closed = 1
 // release all readers
 for {
 sg := c.recvq.dequeue()
 if sg == nil {
 break
 }
 if sg.elem != nil {
 typedmemclr(c.elemtype, sg.elem)
 sg.elem = nil
 }
 if sg.releasetime != 0 {
 sg.releasetime = cputicks()
 }
 gp := sg.g
 gp.param = nil
 if raceenabled {
 raceacquireg(gp, c.raceaddr())
 }
 glist.push(gp)
 }
 // release all writers (they will panic)
 for {
 sg := c.sendq.dequeue()
 if sg == nil {
 break
 }
 sg.elem = nil
 if sg.releasetime != 0 {
 sg.releasetime = cputicks()
 }
 gp := sg.g
 gp.param = nil
 if raceenabled {
 raceacquireg(gp, c.raceaddr())
 }
 glist.push(gp)
 }
 unlock(&c.lock)
 // Ready all Gs now that we've dropped the channel lock.
 for !glist.empty() {
 gp := glist.pop()
 gp.schedlink = 0
 goready(gp, 3)
 }
}

select

在go1.13的源码中,runtime/select.go中已经没有newselect方法了,
select的处理移到了src/cmd/compile/internal/gc/select.go中. 大概看看注释就好了,不然就涉及到编译的过程了
在编译的时候,会遍历所有的节点,生成节点树,这是如果是OSELECT的话,则会调用walkselect,
walkselectcases中。对Node这个对象就不研究了

  1. 如果len(cases),则会生成一个调用block()的node
  2. 如果len(cases)==1: 生成节点,mkcall("block", nil, &ln)大概就是会一直等待这个并一直堵塞,这也大概能解释当只设置一个case是个发送chan时,recv数据都能收到,如果加上default,则有的数据可能会丢失(毕竟堵着的话会跳到default或者别的)
  3. 如果len(cases)==2:会遍历所有cases.
  4. 最后给这个list加上selectgo的调用
// The result of walkstmt MUST be assigned back to n, e.g.
// n.Left = walkstmt(n.Left)
func walkstmt(n *Node) *Node {
 case OSELECT:
 walkselect(n)
}
func walkselect(sel *Node) {
}
func walkselectcases(cases *Nodes) []*Node {
 if n == 0 {
 return []*Node{mkcall("block", nil, nil)}
 }
 // optimization: one-case select: single op.
 // TODO(rsc): Reenable optimization once order.go can handle it.
 // golang.org/issue/7672.
 if n == 1 {}
 // convert case value arguments to addresses.
 // this rewrite is used by both the general code and the next optimization.
 for _, cas := range cases.Slice() {}
 // optimization: two-case select but one is default: single non-blocking op.
 if n == 2 && (cases.First().Left == nil || cases.Second().Left == nil) {}
 // generate sel-struct
 selv := temp(types.NewArray(scasetype(), int64(n)))
 order := temp(types.NewArray(types.Types[TUINT16], 2*int64(n)))
 // register cases
 for i, cas := range cases.Slice() {
 setField("kind", nodintconst(kind))
 if c != nil {
 c = convnop(c, types.Types[TUNSAFEPTR])
 setField("c", c)
 }
 if elem != nil {
 elem = convnop(elem, types.Types[TUNSAFEPTR])
 setField("elem", elem)
 }
 if instrumenting {
 r = mkcall("selectsetpc", nil, nil, bytePtrToIndex(selv, int64(i)))
 init = append(init, r)
 }
 fn := syslook("selectgo")
 r.Rlist.Set1(mkcall1(fn, fn.Type.Results(), nil, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), nodintconst(int64(n))))
}

selectgo就是go总select语句的实现了

  1. 转类型成scases,pollorder,lockorder三个数组
  2. 将nil channel的scase统一成scase{},也就是caseNil类型方便处理
  3. 遍历case, 用fastrandn随机生成一个j,交换i,j的数据放到交换后的pollorder数组中
  4. 根据hchan的地址获得locking order(锁的顺序),使用简单堆排序来保证nlogn时间和常熟堆栈足迹
  5. 设置锁,将所有的chan锁住
  6. 开始遍历选择
    • 第一轮,按照pollorder,查找是否有已经在等待的,如果未找到,则看是否有caseDefault,有的话执行默认,然后返回. 这里对通道的检查, 如果所有的数据都堵塞(进不去,或者出不来) 则进入第二轮
    • 第二轮,将所有的chan都入队列。 caseRecv入c.recvq,caseSend入sendq,将当前G休眠等待被某一个chan唤醒(selparkcommit会将unlock所有chan)
    • 第三轮, 轮训所有的case,将原先入队的数据全部dequeue,从queue中移除,并返回casei, 也就是获取到数据的case位置,
      然后判断,cas是不是nil, 因为有可能是close(chan)事件唤醒的,这时就需要再次loop,当然如果还是判断到closed的这个case, 这里就会返回默认值然后退出。
      这里比较重要的一个是:
  1. 如果chan是nil,则分支永远走不到。 如果chan是closed,那么只要轮到(由于算法的随机,可能有别的chan先走到)肯定都能进去
type scase struct {
 c *hchan // chan
 elem unsafe.Pointer // data element
 kind uint16
 pc uintptr // race pc (for race detector / msan)
 releasetime int64
}
// selectgo implements the select statement.
//
// cas0 points to an array of type [ncases]scase, and order0 points to
// an array of type [2*ncases]uint16. Both reside on the goroutine's
// stack (regardless of any escaping in selectgo).
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
 // 将cas0和order0都转为数组
 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
 order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
 //转为slice,并拆分为pollorder和lockorder
 scases := cas1[:ncases:ncases]
 pollorder := order1[:ncases:ncases]
 lockorder := order1[ncases:][:ncases:ncases]
 // 遍历,将所有chan为nil的都改为scase{} 
 // Replace send/receive cases involving nil channels with
 // caseNil so logic below can assume non-nil channel.
 for i := range scases {
 cas := &scases[i]
 if cas.c == nil && cas.kind != caseDefault {
 *cas = scase{}
 }
 }
 // generate permuted order
 for i := 1; i < ncases; i++ {
 j := fastrandn(uint32(i + 1))
 pollorder[i] = pollorder[j]
 pollorder[j] = uint16(i)
 }
 // lock all the channels involved in the select
 sellock(scases, lockorder)
loop:
 // pass 1 - look for something already waiting
 // pass 2 - enqueue on all chans
 // wait for someone to wake us up
 // pass 3 - dequeue from unsuccessful chans
 selunlock(scases, lockorder)
 goto retc
}

其他

这里想到一个竞争的问题,也就是select阻塞时入了所有的chan列表,当多个chan都去唤醒时怎么保证这个竞争问题
ready这个函数中如果一个协程已经不是Gwaiting状态,再次设置则会报错.
解决的关键就在于selectDone这个参数
dequeue函数中, sgp.g.selectDone这个参数是原子性的,在入队时将其isSelect参数设置为true.
通过这个判断,和对selectDone改为1的过程中,如果改失败了则会跳过这个g,继续选择, 在select的处理逻辑中,当该协程唤醒后,会将select中的chan全部退回,这样就不会出现问题了。


// Mark runnable.
_g_ := getg()
mp := acquirem() // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
 dumpgstatus(gp)
 throw("bad g->status in ready")
}
func (q *waitq) dequeue() *sudog {
 if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
 continue
 }
}

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:darcyaf

查看原文:golang 源码剖析(6): 通道

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1343 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏