分享
  1. 首页
  2. 文章

Golang网络:核心API实现剖析二)

丁凯 · · 2771 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

说明

前面的章节我们基本聊完了golang网络编程的关键API流程,但遗留了一个关键内容:当系统调用返回EAGAIN时,会调用WaitRead/WaitWrite来阻塞当前协程,现在我们接着聊。

WaitRead/WaitWrite

func (pd *pollDesc) Wait(mode int) error {
 res := runtime_pollWait(pd.runtimeCtx, mode)
 return convertErr(res)
}
func (pd *pollDesc) WaitRead() error {
 return pd.Wait('r')
}
func (pd *pollDesc) WaitWrite() error {
 return pd.Wait('w')
}

最终runtime_pollWait走到下面去了:

TEXT net·runtime_pollWait(SB),NOSPLIT,0ドル-0 
 JMP runtime·netpollWait(SB)

我们仔细考虑应该明白:netpollWait的主要作用是:等待关心的socket是否有事件(其实后面我们知道只是等待一个标记位是否发生改变),如果没有事件,那么就将当前的协程挂起,直到有通知事件发生,我们接下来看看到底如何实现:

func netpollWait(pd *pollDesc, mode int) int {
 // 先检查该socket是否有error发生(如关闭、超时等) 
 err := netpollcheckerr(pd, int32(mode))
 if err != 0 {
 return err
 }
 // As for now only Solaris uses level-triggered IO. 
 if GOOS == "solaris" {
 onM(func() {
 netpollarm(pd, mode)
 })
 }
 // 循环等待netpollblock返回值为true 
 // 如果返回值为false且该socket未出现任何错误 
 // 那该协程可能被意外唤醒,需要重新被挂起 
 // 还有一种可能:该socket由于超时而被唤醒 
 // 此时netpollcheckerr就是用来检测超时错误的 
 for !netpollblock(pd, int32(mode), false) {
 err = netpollcheckerr(pd, int32(mode))
 if err != 0 {
 return err
 }
 }
 return 0 
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 gpp := &pd.rg
 if mode == 'w' {
 gpp = &pd.wg
 }
 // set the gpp semaphore to WAIT 
 // 首先将轮询状态设置为pdWait 
 // 为什么要使用for呢?因为casuintptr使用了自旋锁 
 // 为什么使用自旋锁就要加for循环呢? 
 for {
 old := *gpp
 if old == pdReady {
 *gpp = 0 
 return true 
 }
 if old != 0 {
 gothrow("netpollblock: double wait")
 }
 // 将socket轮询相关的状态设置为pdWait 
 if casuintptr(gpp, 0, pdWait) {
 break 
 }
 }
 // 如果未出错将该协程挂起,解锁函数是netpollblockcommit 
 if waitio || netpollcheckerr(pd, mode) == 0 {
 f := netpollblockcommit
 gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
 }
 // 可能是被挂起的协程被唤醒 
 // 或者由于某些原因该协程压根未被挂起 
 // 获取其当前状态记录在old中 
 old := xchguintptr(gpp, 0)
 if old > pdWait {
 gothrow("netpollblock: corrupted state")
 }
 return old == pdReady
}

从上面的分析我们看到,如果无法读写,golang会将当前协程挂起,在协程被唤醒的时候,该标记位应该会被置位。 我们接下来看看这些挂起的协程何时会被唤醒。

事件通知

golang运行库在系统运行过程中存在socket事件检查点,目前,该检查点主要位于以下几个地方:

runtime·startTheWorldWithSema(void):在完成gc后;
findrunnable():这个暂时不知道何时会触发?
sysmon:golang中的监控协程,会周期性检查就绪socket

TODO: 为什么是在这些地方检查socket就绪事件呢?

接下来我们看看如何检查socket就绪事件,在socket就绪后又是如何唤醒被挂起的协程?主要调用函数runtime-netpoll()

我们只关注epoll的实现,对于epoll,上面的方法具体实现是netpoll_epoll.go中的netpoll

func netpoll(block bool) (gp *g) {
 if epfd == -1 {
 return 
 }
 waitms := int32(-1)
 if !block {
 // 如果调用者不希望block 
 // 设置waitsm为0 
 waitms = 0 
 }
 var events [128]epollevent
retry:
 // 调用epoll_wait获取就绪事件 
 n := epollwait(epfd, &events[0], int32(len(events)), waitms)
 if n < 0 {
 ...
 }
 goto retry
 }
 for i := int32(0); i < n; i++ {
 ev := &events[i]
 if ev.events == 0 {
 continue 
 }
 var mode int32 
 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'r' 
 }
 if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'w' 
 }
 // 对每个事件,调用了netpollready 
 // pd主要记录了与该socket关联的等待协程 
 if mode != 0 {
 pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
 netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
 }
 }
 // 如果调用者同步等待且本次未获取到就绪socket 
 // 继续重试 
 if block && gp == nil {
 goto retry
 }
 return gp
}

这个函数主要调用epoll_wait(当然,golang封装了系统调用)来获取就绪socket fd,对每个就绪的fd,调用netpollready()作进一步处理。这个函数的最终返回值就是一个已经就绪的协程(g)链表。

netpollready主要是将该socket fd标记为IOReady,并唤醒等待在该fd上的协程g,将其添加到传入的g链表中。

// make pd ready, newly runnable goroutines (if any) are returned in rg/wg 
func netpollready(gpp **g, pd *pollDesc, mode int32) {
 var rg, wg *g
 if mode == 'r' || mode == 'r'+'w' {
 rg = netpollunblock(pd, 'r', true)
 }
 if mode == 'w' || mode == 'r'+'w' {
 wg = netpollunblock(pd, 'w', true)
 }
 // 将就绪协程添加至链表中 
 if rg != nil {
 rg.schedlink = *gpp
 *gpp = rg
 }
 if wg != nil {
 wg.schedlink = *gpp
 *gpp = wg
 }
}
// 将pollDesc的状态置为pdReady并返回就绪协程 
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
 gpp := &pd.rg
 if mode == 'w' {
 gpp = &pd.wg
 }
 for {
 old := *gpp
 if old == pdReady {
 return nil 
 }
 if old == 0 && !ioready {
 return nil 
 }
 var new uintptr 
 if ioready {
 new = pdReady
 }
 if casuintptr(gpp, old, new) {
 if old == pdReady || old == pdWait {
 old = 0 
 }
 return (*g)(unsafe.Pointer(old))
 }
 }
}

疑问:一个fd会被多个协程同时进行IO么?比如一个协程读,另外一个协程写?或者多个协程同时读?此时返回的是哪个协程就绪呢?

一个socket fd可支持并发读写,因为对于tcp协议来说,是全双工。读写操作的是不同缓冲区,但是不支持并发读和并发写,因为这样会错乱的。所以上面的netFD.RWLock()就是干这个作用的。


有疑问加站长微信联系(非本文作者)

本文来自:知乎专栏

感谢作者:丁凯

查看原文:Golang网络:核心API实现剖析二)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
2771 次点击
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏