分享
  1. 首页
  2. 文章

Golang网络:核心API实现剖析(一)

丁凯 · · 4401 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

这一章节我们将详细描述网络关键API的实现,主要包括Listen、Accept、Read、Write等。 另外,为了突出关键流程,我们选择忽略所有的错误。这样可以使得代码看起来更为简单。 而且我们只关注tcp协议实现,udp和unix socket不是我们关心的。

Listen

func Listen(net, laddr string) (Listener, error) {
 la, err := resolveAddr("listen", net, laddr, noDeadline)
 ......
 switch la := la.toAddr().(type) {
 case *TCPAddr:
 l, err = ListenTCP(net, la)
 case *UnixAddr:
 ......
 }
 ......
}
// 对于tcp协议,返回的的是TCPListener
func ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) {
 ......
 fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen")
 ......
 return &TCPListener{fd}, nil
}
func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) {
 ......
 return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)
}
func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) {
 // 创建底层socket,设置属性为O_NONBLOCK
 s, err := sysSocket(family, sotype, proto)
 ......
 setDefaultSockopts(s, family, sotype, ipv6only)
 // 创建新netFD结构
 fd, err = newFD(s, family, sotype, net)
 ......
 if laddr != nil && raddr == nil {
 switch sotype {
 case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
 // 调用底层listen监听创建的套接字
 fd.listenStream(laddr, listenerBacklog)
 return fd, nil
 case syscall.SOCK_DGRAM:
 ......
 }
 } 
}
// 最终调用该函数来创建一个socket
// 并且将socket属性设置为O_NONBLOCK
func sysSocket(family, sotype, proto int) (int, error) {
 syscall.ForkLock.RLock()
 s, err := syscall.Socket(family, sotype, proto)
 if err == nil {
 syscall.CloseOnExec(s)
 }
 syscall.ForkLock.RUnlock()
 if err != nil {
 return -1, err
 }
 if err = syscall.SetNonblock(s, true); err != nil {
 syscall.Close(s)
 return -1, err
 }
 return s, nil
}
func (fd *netFD) listenStream(laddr sockaddr, backlog int) error {
 if err := setDefaultListenerSockopts(fd.sysfd)
 if lsa, err := laddr.sockaddr(fd.family); err != nil {
 return err
 } else if lsa != nil {
 // Bind绑定至该socket
 if err := syscall.Bind(fd.sysfd, lsa); err != nil {
 return os.NewSyscallError("bind", err)
 }
 }
 // 监听该socket
 if err := syscall.Listen(fd.sysfd, backlog); 
 // 这里非常关键:初始化socket与异步IO相关的内容
 if err := fd.init(); err != nil {
 return err
 }
 lsa, _ := syscall.Getsockname(fd.sysfd)
 fd.setAddr(fd.addrFunc()(lsa), nil)
 return nil
}

我们这里看到了如何实现Listen。流程基本都很简单,但是因为我们使用了异步编程,因此,我们在Listen完该socket后,还必须将其添加到监听队列中,以后该socket有事件到来时能够及时通知到。

对linux有所了解的应该都知道epoll,没错golang使用的就是epoll机制来实现socket事件通知。那我们看对一个监听socket,是如何将其添加到epoll的监听队列中呢?

func (fd *netFD) init() error {
 if err := fd.pd.Init(fd); err != nil {
 return err
 }
 return nil
}
func (pd *pollDesc) Init(fd *netFD) error {
 // 利用了Once机制,保证一个进程只会执行一次
 // runtime_pollServerInit: 
 // TEXT net·runtime_pollServerInit(SB),NOSPLIT,0ドル-0
 // JMP runtime·netpollServerInit(SB)
 serverInit.Do(runtime_pollServerInit)
 // runtime_pollOpen:
 // TEXT net·runtime_pollOpen(SB),NOSPLIT,0ドル-0
 // JMP runtime·netpollOpen(SB)
 ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
 if errno != 0 {
 return syscall.Errno(errno)
 }
 pd.runtimeCtx = ctx
 return nil
}

这里就是socket异步编程的关键:

netpollServerInit()初始化异步编程结构,对于epoll,该函数是netpollinit,且使用Once机制保证一个进程 只会初始化一次;

func netpollinit() {
 epfd = epollcreate1(_EPOLL_CLOEXEC)
 if epfd >= 0 {
 return
 }
 epfd = epollcreate(1024)
 if epfd >= 0 {
 closeonexec(epfd)
 return
 }
 ......
}

netpollOpen则在socket被创建出来后将其添加到epoll队列中,对于epoll,该函数被实例化为netpollopen

func netpollopen(fd uintptr, pd *pollDesc) int32 {
 var ev epollevent
 ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

OK,看到这里,我们也就明白了,监听一个套接字的时候无非就是传统的socket异步编程,然后将该socket添加到 epoll的事件监听队列中。

Accept

既然我们描述的重点的tcp协议,因此,我们看看TCPListener的Accept方法是怎么实现的:

func (l *TCPListener) Accept() (Conn, error) {
 c, err := l.AcceptTCP()
 ......
}
func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
 ......
 fd, err := l.fd.accept()
 ......
 // 返回给调用者一个新的TCPConn
 return newTCPConn(fd), nil
}
func (fd *netFD) accept() (netfd *netFD, err error) {
 // 为什么对该函数加读锁?
 if err := fd.readLock(); err != nil {
 return nil, err
 }
 defer fd.readUnlock()
 ......
 for {
 // 这个accept是golang包装的系统调用
 // 用来处理跨平台
 s, rsa, err = accept(fd.sysfd)
 if err != nil {
 if err == syscall.EAGAIN {
 // 如果没有可用连接,WaitRead()阻塞该协程
 // 后面会详细分析WaitRead.
 if err = fd.pd.WaitRead(); err == nil {
 continue
 }
 } else if err == syscall.ECONNABORTED {
 // 如果连接在Listen queue时就已经被对端关闭
 continue
 }
 }
 break
 }
 netfd, err = newFD(s, fd.family, fd.sotype, fd.net)
 ......
 // 这个前面已经分析,将该fd添加到epoll队列中
 err = netfd.init()
 ......
 lsa, _ := syscall.Getsockname(netfd.sysfd)
 netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
 return netfd, nil
}

OK,从前面的编程事例中我们知道,一般在主协程中会accept新的connection,使用异步编程我们知道,如果没有 新连接到来,该协程会一直被阻塞,直到新连接到来有人唤醒了该协程。

一般在主协程中调用accept,如果返回值为EAGAIN,则调用WaitRead来阻塞当前协程,后续在该socket有事件到来时被唤醒,WaitRead以及唤醒过程我们会在后面仔细分析。

Read

func (c *conn) Read(b []byte) (int, error) {
 if !c.ok() {
 return 0, syscall.EINVAL
 }
 return c.fd.Read(b)
}
func (fd *netFD) Read(p []byte) (n int, err error) {
 // 为什么对函数调用加读锁
 if err := fd.readLock(); err != nil {
 return 0, err
 }
 defer fd.readUnlock()
 // 这个又是干嘛?
 if err := fd.pd.PrepareRead(); err != nil {
 return 0, &OpError{"read", fd.net, fd.raddr, err}
 }
 for {
 n, err = syscall.Read(int(fd.sysfd), p)
 if err != nil {
 n = 0
 // 如果返回EAGIN,阻塞当前协程直到有数据可读被唤醒
 if err == syscall.EAGAIN {
 if err = fd.pd.WaitRead(); err == nil {
 continue
 }
 }
 }
 // 检查错误,封装io.EOF
 err = chkReadErr(n, err, fd)
 break
 }
 if err != nil && err != io.EOF {
 err = &OpError{"read", fd.net, fd.raddr, err}
 }
 return
}
func chkReadErr(n int, err error, fd *netFD) error {
 if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
 return io.EOF
 }
 return err
}

Read的流程与Accept流程极其一致,阅读起来也很简单。相信不用作过多解释,自己看吧。 需要注意的是每次Read不能保证可以读到想读的那么多内容,比如缓冲区大小是10,而实际可能只读到5,应用程序需要能够处理这种情况。

Write

func (fd *netFD) Write(p []byte) (nn int, err error) {
 // 为什么这里加写锁
 if err := fd.writeLock(); err != nil {
 return 0, err
 }
 defer fd.writeUnlock()
 // 这个是干什么?
 if err := fd.pd.PrepareWrite(); err != nil {
 return 0, &OpError{"write", fd.net, fd.raddr, err}
 }
 // nn记录总共写入的数据量,每次Write可能只能写入部分数据
 for {
 var n int
 n, err = syscall.Write(int(fd.sysfd), p[nn:])
 if n > 0 {
 nn += n
 }
 // 如果数组数据已经全部写完,函数返回
 if nn == len(p) {
 break
 }
 // 如果写入数据时被block了,阻塞当前协程
 if err == syscall.EAGAIN {
 if err = fd.pd.WaitWrite(); err == nil {
 continue
 }
 }
 if err != nil {
 n = 0
 break
 }
 // 如果返回值为0,代表了什么?
 if n == 0 {
 err = io.ErrUnexpectedEOF
 break
 }
 }
 if err != nil {
 err = &OpError{"write", fd.net, fd.raddr, err}
 }
 return nn, err
}

注意Write语义与Read不一样的地方:

Write尽量将用户缓冲区的内容全部写入至底层socket,如果遇到socket暂时不可写入,会阻塞当前协程; Read在某次读取成功时立即返回,可能会导致读取的数据量少于用户缓冲区的大小; 为什么会在实现上有此不同,我想可能read的优先级比较高吧,应用程序可能一直在等着,我们不能等到数据一直读完才返回,会阻塞用户。 而写不一样,优先级相对较低,而且用户一般也不着急写立即返回,所以可以将所有的数据全部写入,而且这样 也能简化应用程序的写法。

总结

上面我们基本说完了golang网络编程内的关键API流程,我们遗留了一个关键内容:当系统调用返回EAGAIN时,会 调用WaitRead/WaitWrite来阻塞当前协程,我会在接下来的章节中继续分析。


有疑问加站长微信联系(非本文作者)

本文来自:知乎专栏

感谢作者:丁凯

查看原文:Golang网络:核心API实现剖析(一)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
4401 次点击
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏