分享
  1. 首页
  2. 文章

golang源码学习之select

ihornet · · 1126 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

先上结论吧

  • select 是针对chan类型的, 所以case 只有default和chan(读/写)两种
  • 遍历case的时候顺序不确定,但chan的优先级比default高。当有default和可执行的chan时,总是执行chan
  • 当没有default,且无可执行的chan时,阻塞
  • select{}, 阻塞

开始看源码吧

scase

// case 的几种类型
const (
 caseNil = iota
 caseRecv
 caseSend
 caseDefault
)
type scase struct {
 c *hchan // chan
 elem unsafe.Pointer // data element 数据元素
 kind uint16 // 对应const的那几种类型
 pc uintptr // race pc (for race detector / msan)
 releasetime int64
}

入口

func reflect_rselect(cases []runtimeSelect) (int, bool) {
 // 没有case 时直接阻塞, 所以我们在demo时 main(){ .... select{} }
 if len(cases) == 0 {
 block()
 }
 sel := make([]scase, len(cases))
 // 为什么是 2 倍呢? 后面的pollorder和lockorder会用到
 order := make([]uint16, 2*len(cases))
 for i := range cases {
 rc := &cases[i]
 switch rc.dir {
 case selectDefault:
 sel[i] = scase{kind: caseDefault}
 case selectSend:
 sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
 case selectRecv:
 sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
 }
 if raceenabled || msanenabled {
 selectsetpc(&sel[i])
 }
 }
 return selectgo(&sel[0], &order[0], len(cases))
}

这里主要是初始化case数组。重点在selectgo中

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
 if debugSelect {
 print("select: cas0=", cas0, "\n")
 }
 // 指向case数组首地址
 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
 // order1 长度是 cas1 的两倍
 order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
 // slice里面有两个冒号什么意思呢? a[x:y:z] 切片长度: y-x 切片容量:z-x
 scases := cas1[:ncases:ncases]
 // 轮询顺序
 pollorder := order1[:ncases:ncases]
 // chan 加锁顺序
 lockorder := order1[ncases:][:ncases:ncases] //赋值完之后,其实只用了order1的 2*ncases长度,pollorder占了前面ncases, lockorder占了后面ncases
 // Replace send/receive cases involving nil channels with
 // caseNil so logic below can assume non-nil channel.
 for i := range scases {
 cas := &scases[i]
 if cas.c == nil && cas.kind != caseDefault {
 *cas = scase{}
 }
 }
 var t0 int64
 if blockprofilerate > 0 {
 t0 = cputicks()
 for i := 0; i < ncases; i++ {
 scases[i].releasetime = -1
 }
 }
 // 洗牌 打乱pollorder顺序
 for i := 1; i < ncases; i++ {
 j := fastrandn(uint32(i + 1))
 pollorder[i] = pollorder[j]
 pollorder[j] = uint16(i)
 }
 // sort the cases by Hchan address to get the locking order.
 // simple heap sort, to guarantee n log n time and constant stack footprint.
 // 下面一堆for循环,根据 hchan的地址排序, 生成lockorder加锁顺序
 for i := 0; i < ncases; i++ {
 j := i
 // Start with the pollorder to permute cases on the same channel.
 c := scases[pollorder[i]].c
 // 这里的sortkey() 其实只是返回内存地址
 for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
 k := (j - 1) / 2
 lockorder[j] = lockorder[k]
 j = k
 }
 lockorder[j] = pollorder[i]
 }
 for i := ncases - 1; i >= 0; i-- {
 o := lockorder[i]
 c := scases[o].c
 lockorder[i] = lockorder[0]
 j := 0
 for {
 k := j*2 + 1
 if k >= i {
 break
 }
 if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
 k++
 }
 if c.sortkey() < scases[lockorder[k]].c.sortkey() {
 lockorder[j] = lockorder[k]
 j = k
 continue
 }
 break
 }
 lockorder[j] = o
 }
 if debugSelect {
 for i := 0; i+1 < ncases; i++ {
 if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
 print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
 throw("select: broken sort")
 }
 }
 }
 // lock all the channels involved in the select
 sellock(scases, lockorder)
 var (
 gp *g
 sg *sudog
 c *hchan
 k *scase
 sglist *sudog
 sgnext *sudog
 qp unsafe.Pointer
 nextp **sudog
 )
loop:
 // pass 1 - look for something already waiting
 // CASE 1: case中有可执行的chan, 或者存在default case
 var dfli int
 var dfl *scase
 var casi int
 var cas *scase
 var recvOK bool
 //开始遍历case数组了
 for i := 0; i < ncases; i++ {
 casi = int(pollorder[i])
 cas = &scases[casi]
 c = cas.c
 switch cas.kind {
 // chan 为空 下一轮循环
 case caseNil:
 continue
 case caseRecv: // 接收chan
 sg = c.sendq.dequeue()
 //当chan的send队列存在 G 时
 if sg != nil {
 goto recv
 }
 // 当chan的缓存队列存在元素时
 if c.qcount > 0 {
 goto bufrecv
 }
 // 当chan关闭时
 if c.closed != 0 {
 goto rclose
 }
 case caseSend: // 发送队列
 if raceenabled {
 racereadpc(c.raceaddr(), cas.pc, chansendpc)
 }
 // chan关闭时
 if c.closed != 0 {
 goto sclose
 }
 sg = c.recvq.dequeue()
 // chan的接收队列存在 G 时
 if sg != nil {
 goto send
 }
 // chan的缓存队列的元素少于缓存容量时
 if c.qcount < c.dataqsiz {
 goto bufsend
 }
 case caseDefault: // case default, 你看 default的情况并没有结束循环,说明 chan的优先级比default高
 dfli = casi
 dfl = cas
 }
 }
 if dfl != nil {
 selunlock(scases, lockorder)
 casi = dfli
 cas = dfl
 goto retc
 }
 // pass 2 - enqueue on all chans
 // CASE 2: 将当前的 G 加入的 chan 的等待队列中
 gp = getg()
 if gp.waiting != nil {
 throw("gp.waiting != nil")
 }
 nextp = &gp.waiting
 for _, casei := range lockorder {
 casi = int(casei)
 cas = &scases[casi]
 if cas.kind == caseNil {
 continue
 }
 c = cas.c
 sg := acquireSudog()
 sg.g = gp
 sg.isSelect = true
 // No stack splits between assigning elem and enqueuing
 // sg on gp.waiting where copystack can find it.
 sg.elem = cas.elem
 sg.releasetime = 0
 if t0 != 0 {
 sg.releasetime = -1
 }
 sg.c = c
 // Construct waiting list in lock order.
 *nextp = sg
 nextp = &sg.waitlink
 switch cas.kind {
 case caseRecv:
 // 加入等待接收队列
 // 不断的循环是不是会导致队列边长呢? 其实不是的, 因为在CASE 1 的时候有做出栈操作
 c.recvq.enqueue(sg)
 case caseSend:
 // 加入等待发送队列
 c.sendq.enqueue(sg)
 }
 }
 // wait for someone to wake us up
 gp.param = nil
 // 当前 G 进入休眠
 gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
 sellock(scases, lockorder)
 gp.selectDone = 0
 sg = (*sudog)(gp.param)
 gp.param = nil
 // pass 3 - dequeue from unsuccessful chans
 // otherwise they stack up on quiet channels
 // record the successful case, if any.
 // We singly-linked up the SudoGs in lock order.
 // CASE 3: 被唤醒, 这种情况是不存在default的时候
 casi = -1
 cas = nil
 sglist = gp.waiting
 // Clear all elem before unlinking from gp.waiting.
 for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
 sg1.isSelect = false
 sg1.elem = nil
 sg1.c = nil
 }
 gp.waiting = nil
 for _, casei := range lockorder {
 k = &scases[casei]
 if k.kind == caseNil {
 continue
 }
 if sglist.releasetime > 0 {
 k.releasetime = sglist.releasetime
 }
 // 这段代码一直没想明白,直到我回想到chan的send()方法时,才有些明白了。
 // sg = (*sudog)(gp.param), gp.param其实就是sudog,也就是加入等待队列时的sudog。
 // 当被唤醒时,唤醒的是gp.param,所以遍历等待队列 判断sudog相等就可以确定是哪个case了
 if sg == sglist {
 // sg has already been dequeued by the G that woke us up.
 casi = int(casei)
 cas = k
 } else {
 c = k.c
 if k.kind == caseSend {
 c.sendq.dequeueSudoG(sglist)
 } else {
 c.recvq.dequeueSudoG(sglist)
 }
 }
 sgnext = sglist.waitlink
 sglist.waitlink = nil
 releaseSudog(sglist)
 sglist = sgnext
 }
 // 没找到对应的case, 重新进入loop
 if cas == nil {
 goto loop
 }
 c = cas.c
 if debugSelect {
 print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
 }
 if cas.kind == caseRecv {
 recvOK = true
 }
 if raceenabled {
 if cas.kind == caseRecv && cas.elem != nil {
 raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
 } else if cas.kind == caseSend {
 raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
 }
 }
 if msanenabled {
 if cas.kind == caseRecv && cas.elem != nil {
 msanwrite(cas.elem, c.elemtype.size)
 } else if cas.kind == caseSend {
 msanread(cas.elem, c.elemtype.size)
 }
 }
 selunlock(scases, lockorder)
 goto retc
bufrecv:
 // can receive from buffer
 if raceenabled {
 if cas.elem != nil {
 raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
 }
 raceacquire(chanbuf(c, c.recvx))
 racerelease(chanbuf(c, c.recvx))
 }
 if msanenabled && cas.elem != nil {
 msanwrite(cas.elem, c.elemtype.size)
 }
 recvOK = true
 qp = chanbuf(c, c.recvx)
 if cas.elem != nil {
 // 将chan缓存中的数据拷贝到 case.elem。 eg: a := <-ch, a就是case.elem
 typedmemmove(c.elemtype, cas.elem, qp)
 }
 typedmemclr(c.elemtype, qp)
 c.recvx++
 if c.recvx == c.dataqsiz {
 c.recvx = 0
 }
 c.qcount--
 selunlock(scases, lockorder)
 goto retc
bufsend:
 // can send to buffer
 if raceenabled {
 raceacquire(chanbuf(c, c.sendx))
 racerelease(chanbuf(c, c.sendx))
 raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
 }
 if msanenabled {
 msanread(cas.elem, c.elemtype.size)
 }
 // 将cas.elem拷贝到chan的缓存中。eg: ch <- a, a 就是 cas.elem
 typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
 c.sendx++
 if c.sendx == c.dataqsiz {
 c.sendx = 0
 }
 c.qcount++
 selunlock(scases, lockorder)
 goto retc
recv:
 // can receive from sleeping sender (sg)
 recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
 if debugSelect {
 print("syncrecv: cas0=", cas0, " c=", c, "\n")
 }
 recvOK = true
 goto retc
rclose:
 // read at end of closed channel
 selunlock(scases, lockorder)
 recvOK = false
 if cas.elem != nil {
 typedmemclr(c.elemtype, cas.elem)
 }
 if raceenabled {
 raceacquire(c.raceaddr())
 }
 goto retc
send:
 // can send to a sleeping receiver (sg)
 if raceenabled {
 raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
 }
 if msanenabled {
 msanread(cas.elem, c.elemtype.size)
 }
 send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
 if debugSelect {
 print("syncsend: cas0=", cas0, " c=", c, "\n")
 }
 goto retc
retc:
 if cas.releasetime > 0 {
 blockevent(cas.releasetime-t0, 1)
 }
 return casi, recvOK
sclose:
 // send on closed channel
 selunlock(scases, lockorder)
 panic(plainError("send on closed channel"))
}

bufrecv、bufsend、recv、rclose、send最终都会跳转到retc。 这里涉及到一些channel的知识,有兴趣的可以看我另一篇关于channel的文章 https://www.jianshu.com/p/9dd5e77469da


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:ihornet

查看原文:golang源码学习之select

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1126 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏