分享
  1. 首页
  2. 文章

单机百万并发,golang 50行代码

UPDOWN_GG · · 6907 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

本文首先介绍单机百万并发的测试方法和测试结果,然后分析go语言50行代码实现的单机百万并发网络服务器背后的秘密

组网

采用6台2核8G内存的云主机作为client

采用1台4核16G内存的云主机作为server

组网图

client端设置

设置系统打开的最大文件数为20万

ulimit -n 200000

修改端口可用范围为1024到65535

echo 1024 65535 > /proc/sys/net/ipv4/ip_local_port_range

单台client虚机建立18万连接

配置单网卡多ip,每个网卡配置三个ip,启动三个client进程,每个client进程指定不同的local ip建立6万连接,总共18万连接

server端配置

设置系统打开的最大文件数为100万

ulimit -n 1000000

设置半连接队列和全连接队列长度

测试过程中出现了一个现象,客户端建立了30000连接,服务端只建立了28570连接

经过排查,原因是:

1 全连接队列满了,如下图,overflowed次数在增加

2 tcp_abort_on_overflow 为0,表示如果三次握手第三步的时候全连接队列满了那么server扔掉client 发过来的ack(在server端认为连接还没建立起来)

tcp_abort_on_overflow为 1,表示第三步的时候如果全连接队列满了,server发送一个reset包给client,表示废掉这个握手过程和这个连接(本来在server端这个连接就还没建立起来)

解决方法:

设置半连接队列长度为10000

echo 10000 >/proc/sys/net/ipv4/tcp_max_syn_backlog

设置全连接队列长度为10000

echo 10000 >/proc/sys/net/core/somaxconn

参考 【转】关于TCP 半连接队列和全连接队列 - sidesky - 博客园

linux内核调优tcp_max_syn_backlog和somaxconn的区别-10931853-51CTO博客

设置conntrack最大连接数

默认net.nf_conntrack_max 为 262144,设置为100万

sysctl -w net.nf_conntrack_max=1000000

tcp最大连接数调优,可参考Linux 内核优化-调大TCP最大连接数 - 简书

最终测试结果

server建立起96万连接

平时ss命令使用最多的是ss -anp,这里需要注意在连接数非常大的时候,指定p参数命令慢的几乎不可用,这里只指定an参数

ss比netstat性能好,参考https://blog.csdn.net/hustsselbj/article/details/47438781

cpu和内存使用情况

cpu大概占用2个核,内存3g

查看cpu硬件信息,cpu的频率为2.4G

查看cpu硬件信息,参考 linux(centos)查看cpu硬件信息命令图解教程 电脑维修技术网

客户端、服务端代码实现

客户端


package main
import (
 "flag"
 "fmt"
 "net"
 "os"
 "time"
)
var RemoteAddr *string
var ConcurNum *int
var LocalAddr *string
func init() {
 RemoteAddr = flag.String("remote-ip", "127.0.0.1", "ip addr of remote server")
 ConcurNum = flag.Int("concurrent-num", 100, "concurrent number of client")
 LocalAddr = flag.String("local-ip", "0.0.0.0", "ip addr of remote server")
}
func consume() {
 laddr := &net.TCPAddr{IP: net.ParseIP(*LocalAddr)}
 var dialer net.Dialer
 dialer.LocalAddr = laddr
 conn, err := dialer.Dial("tcp", *RemoteAddr+":8888")
 if err != nil {
 fmt.Println("dial failed:", err)
 os.Exit(1)
 }
 defer conn.Close()
 buffer := make([]byte, 512)
 for {
 _, err2 := conn.Read(buffer)
 if err2 != nil {
 fmt.Println("Read failed:", err2)
 return
 }
 // fmt.Println("count:", n, "msg:", string(buffer))
 }
}
func main() {
 flag.Parse()
 for i := 0; i < *ConcurNum; i++ {
 go consume()
 }
 time.Sleep(3600 * time.Second)
}

服务端

package main
import (
 "fmt"
 "net"
 "os"
 "time"
)
var array []byte = make([]byte, 10)
func checkError(err error, info string) (res bool) {
 if err != nil {
 fmt.Println(info + " " + err.Error())
 return false
 }
 return true
}
func Handler(conn net.Conn) {
 for {
 _, err := conn.Write(array)
 if err != nil {
 return
 }
 time.Sleep(10 * time.Second)
 }
}
func main() {
 for i := 0; i < 10; i += 1 {
 array[i] = 'a'
 }
 service := ":8888"
 tcpAddr, _ := net.ResolveTCPAddr("tcp4", service)
 l, _ := net.ListenTCP("tcp", tcpAddr)
 for {
 conn, err := l.Accept()
 if err != nil {
 fmt.Printf("accept error, err=%s\n", err.Error())
 os.Exit(1)
 }
 go Handler(conn)
 }
}

高性能网络编程的线程模型

TPC

TPC 是 Thread Per Connection 的缩写,指每次有新的连接就新建一个线程去专门处理这个连接请求。


模型特点:

  • 采用阻塞式I/O模型获取输入数据
  • 每个连接都需要独立的线程完成数据输入,业务处理,数据返回的完整操作

存在的问题:

  • 并发数较大时,需要创建大量线程来处理连接,系统资源占用较大

reactor

reactor模式的核心组成包括reactor和线程池。reactor负责监听网络连接的IO是否可读可写,线程池负责具体业务的处理。在高并发的场景下,reactor采用epoll的效率非常高。


模型特点:

  • 采用非阻塞I/O,I/O多路复用
  • 采用线程池来处理业务

golang GPC模型

GPC 是 Goroutine Per Connection 的缩写,指每次有新的连接就新启动一个golang协程去专门处理这个连接请求。


模型特点:

  • 可采用阻塞IO的方式编程
  • 每个连接都需要独立的协程完成数据输入,业务处理,数据返回的完整操作

为什么GPC可以支持单机百万并发

GPC模型跟TPC模型看起来非常相似,为什么GPC可以支持单机百万并发呢?

GPC模型、TPC模型比较

  1. 栈大小:GPC模型中goroutine栈初始大小为4kB,栈的大小可以按需动态增加或减小。而TPC模型中线程默认栈大小为1MB。
  2. IO模型:GPC和TPC都是阻塞式编程。但是GPC模型底层是非阻塞IO,golang在语言层面将非阻塞IO包装成了阻塞IO(底层实现是非阻塞IO未就绪时,读操作返回EAGAIN,golang运行时系统将协程状态设置为Wait,进行协程的切换)
  3. 协程、线程的切换: 协程的切换比线程切换要简单的多,可参考linux操作系统笔记(进程)

GPC模型背后的秘密

GPC模型底层实现其实是reactor模型,golang在语言层面将这一模型封装好,可以采用阻塞的方式编码

GPC模型源码分析

golang源码版本为1.9.4


IO线程的源码实现

启动一个线程运行sysmon函数

runtime/proc.go

// The main goroutine.
func main() { 
 g := getg()
 
 // Racectx of m0->g0 is used only as the parent of the main goroutine.
 // It must not be used for anything else.
 g.m.g0.racectx = 0
 
 // Max stack size is 1 GB on 64-bit, 250 MB on 32-bit.
 // Using decimal instead of binary GB and MB because
 // they look nicer in the stack overflow failure message.
 if sys.PtrSize == 8 {
 maxstacksize = 1000000000
 } else { 
 maxstacksize = 250000000
 } 
 
 // Allow newproc to start new Ms.
 mainStarted = true 
 
 systemstack(func() {
 //启动线程,运行sysmon函数
 newm(sysmon, nil) 
 }) 
 ...........

sysmon的实现
sysmon函数执行netpoll,获得可读写的fd,将fd关联的协程的状态设置为ready

runtime/proc.go

func sysmon() { 
 // If a heap span goes unused for 5 minutes after a garbage collection,
 // we hand it back to the operating system.
 scavengelimit := int64(5 * 60 * 1e9)
 
 if debug.scavenge > 0 { 
 // Scavenge-a-lot for testing.
 forcegcperiod = 10 * 1e6
 scavengelimit = 20 * 1e6
 } 
 
 lastscavenge := nanotime() 
 nscavenge := 0 
 
 lasttrace := int64(0) 
 idle := 0 // how many cycles in succession we had not wokeup somebody
 delay := uint32(0) 
 for { 
 if idle == 0 { // start with 20us sleep...
 delay = 20 
 } else if idle > 50 { // start doubling the sleep after 1ms...
 delay *= 2 
 } 
 if delay > 10*1000 { // up to 10ms
 delay = 10 * 1000
 } 
 usleep(delay)
 。。。。
 // poll network if not polled for more than 10ms
 lastpoll := int64(atomic.Load64(&sched.lastpoll))
 now := nanotime() 
 if lastpoll != 0 && lastpoll+10*1000*1000 < now {
 atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
 //netpoll中会执行epollWait,epollWait返回可读写的fd
 //netpoll返回可读写的fd关联的协程
 gp := netpoll(false) // non-blocking - returns list of goroutines
 if gp != nil { 
 // Need to decrement number of idle locked M's
 // (pretending that one more is running) before injectglist.
 // Otherwise it can lead to the following situation:
 // injectglist grabs all P's but before it starts M's to run the P's,
 // another M returns from syscall, finishes running its G,
 // observes that there is no work to do and no other running M's
 // and reports deadlock.
 incidlelocked(-1)
 //将可读写fd关联的协程状态设置为ready
 injectglist(gp)
 incidlelocked(1)
 } 
 } 
 。。。。。。
}

netpoll的实现
netpoll执行epollWait,获取可读写的fd,返回可读写fd关联的协程

runtime/netpoll_epoll.go

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) *g {
 if epfd == -1 {
 return nil
 }
 waitms := int32(-1)
 if !block {
 waitms = 0
 }
 var events [128]epollevent
retry: 
 n := epollwait(epfd, &events[0], int32(len(events)), waitms)
 // print("epoll wait\n")
 if n < 0 {
 if n != -_EINTR {
 println("runtime: epollwait on fd", epfd, "failed with", -n)
 throw("runtime: netpoll failed")
 }
 goto retry
 }
 var gp guintptr
 for i := int32(0); i < n; i++ {
 ev := &events[i]
 if ev.events == 0 {
 continue
 }
 var mode int32
 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'r'
 }
 if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'w'
 }
 if mode != 0 {
 pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
 //将pd关联的协程加入到gp协程链上
 netpollready(&gp, pd, mode)
 }
 }
 if block && gp == 0 {
 goto retry
 }
 return gp.ptr()
} 

injectglist的实现
injectglist将协程的状态设置为ready状态

runtime/proc.go

// Injects the list of runnable G's into the scheduler.
// Can run concurrently with GC.
func injectglist(glist *g) {
 if glist == nil {
 return 
 } 
 if trace.enabled {
 for gp := glist; gp != nil; gp = gp.schedlink.ptr() {
 traceGoUnpark(gp, 0)
 } 
 } 
 lock(&sched.lock)
 var n int 
 for n = 0; glist != nil; n++ {
 gp := glist
 glist = gp.schedlink.ptr()
 //将waiting状态的协程设置为runnable
 casgstatus(gp, _Gwaiting, _Grunnable)
 globrunqput(gp)
 } 
 unlock(&sched.lock)
 for ; n != 0 && sched.npidle != 0; n-- {
 startm(nil, false)
 } 
} 

服务端socket实现

net.ListenTCP的实现
ListenTCP调用socket函数,socket函数会通过系统调用创建socket、设置非阻塞、bind、listen

net/sock_posix.go

// socket returns a network file descriptor that is ready for
// asynchronous I/O using the network poller.
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) {
 //sysSocket函数会通过系统调用创建socket,并通过系统调用设置非阻塞
 s, err := sysSocket(family, sotype, proto)
 if err != nil {
 return nil, err 
 } 
 if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
 poll.CloseFunc(s)
 return nil, err 
 } 
 //为socket分配文件描述符fd
 if fd, err = newFD(s, family, sotype, net); err != nil {
 poll.CloseFunc(s)
 return nil, err 
 } 
 
 // This function makes a network file descriptor for the
 // following applications:
 // 
 // - An endpoint holder that opens a passive stream
 // connection, known as a stream listener
 // 
 // - An endpoint holder that opens a destination-unspecific
 // datagram connection, known as a datagram listener
 // 
 // - An endpoint holder that opens an active stream or a
 // destination-specific datagram connection, known as a
 // dialer
 // - An endpoint holder that opens the other connection, such
 // as talking to the protocol stack inside the kernel
 //
 // For stream and datagram listeners, they will only require
 // named sockets, so we can assume that it's just a request
 // from stream or datagram listeners when laddr is not nil but
 // raddr is nil. Otherwise we assume it's just for dialers or
 // the other connection holders.
 
 if laddr != nil && raddr == nil {
 switch sotype {
 case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
 //listenStream会通过系统调用bind绑定socket地址,通过系统调用listen
 //进行socket监听,通过fd.init()函数将fd加入epoll
 if err := fd.listenStream(laddr, listenerBacklog); err != nil {
 fd.Close()
 return nil, err
 }
 return fd, nil
 case syscall.SOCK_DGRAM:
 if err := fd.listenDatagram(laddr); err != nil {
 fd.Close()
 return nil, err
 }
 return fd, nil
 }
 }
 if err := fd.dial(ctx, laddr, raddr); err != nil {
 fd.Close()
 return nil, err
 }
 return fd, nil

Accept的实现

net/fd_unix.go

func (fd *netFD) accept() (netfd *netFD, err error) {
 //pfd.Accept会执行accept系统调用,返回新的socket连接,
 //并设置新的socket连接为非阻塞
 d, rsa, errcall, err := fd.pfd.Accept()
 if err != nil {
 if errcall != "" {
 err = wrapSyscallError(errcall, err)
 } 
 return nil, err 
 } 
 //为新的连接分配一个文件描述符 
 if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
 poll.CloseFunc(d)
 return nil, err 
 } 
 //通过netfd.init(),将accept新返回的socket fd添加到epoll
 if err = netfd.init(); err != nil {
 fd.Close()
 return nil, err 
 } 
 lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
 netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
 return netfd, nil 
} 

internal/poll/fd_unix.go

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
 if err := fd.readLock(); err != nil {
 return -1, nil, "", err 
 } 
 defer fd.readUnlock()
 
 if err := fd.pd.prepareRead(fd.isFile); err != nil {
 return -1, nil, "", err 
 } 
 for {
 //accept函数内部会执行accept系统调用
 //将返回的新的socket fd设置为非阻塞
 s, rsa, errcall, err := accept(fd.Sysfd)
 if err == nil {
 return s, rsa, "", err 
 } 
 switch err {
 //socket全连接队列为空
 case syscall.EAGAIN:
 if fd.pd.pollable() {
 //设置协程状态为wait
 if err = fd.pd.waitRead(fd.isFile); err == nil {
 continue
 } 
 } 
 case syscall.ECONNABORTED:
 // This means that a socket on the listen
 // queue was closed before we Accept()ed it;
 // it's a silly error, so try again.
 continue
 } 
 return -1, nil, errcall, err 
 } 
}

Read的实现

internal/poll/fd_unix.go

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
 if err := fd.readLock(); err != nil {
 return 0, err 
 } 
 defer fd.readUnlock()
 if len(p) == 0 { 
 // If the caller wanted a zero byte read, return immediately
 // without trying (but after acquiring the readLock).
 // Otherwise syscall.Read returns 0, nil which looks like
 // io.EOF.
 // TODO(bradfitz): make it wait for readability? (Issue 15735)
 return 0, nil 
 } 
 if err := fd.pd.prepareRead(fd.isFile); err != nil {
 return 0, err 
 } 
 if fd.IsStream && len(p) > maxRW {
 p = p[:maxRW]
 } 
 for {
 //执行read系统调用
 n, err := syscall.Read(fd.Sysfd, p)
 
 if err != nil {
 n = 0 
 if err == syscall.EAGAIN && fd.pd.pollable() {
 //socket fd没有数据可读,将协程状态设置为wait
 if err = fd.pd.waitRead(fd.isFile); err == nil {
 continue
 }
 }
 }
 
 err = fd.eofError(n, err)
 return n, err
 }
}

Write的实现

internal/poll/fd_unix.go

// Write implements io.Writer.
func (fd *FD) Write(p []byte) (int, error) {
 if err := fd.writeLock(); err != nil {
 return 0, err
 } 
 defer fd.writeUnlock()
 if err := fd.pd.prepareWrite(fd.isFile); err != nil {
 return 0, err
 } 
 var nn int
 for { 
 max := len(p)
 if fd.IsStream && max-nn > maxRW {
 max = nn + maxRW
 }
 //执行write系统调用
 n, err := syscall.Write(fd.Sysfd, p[nn:max])
 if n > 0 {
 nn += n
 }
 if nn == len(p) {
 return nn, err
 }
 if err == syscall.EAGAIN && fd.pd.pollable() {
 //socket fd不可写,将协程状态设置为wait
 if err = fd.pd.waitWrite(fd.isFile); err == nil {
 continue
 }
 }
 if err != nil {
 return nn, err
 }
 if n == 0 {
 return nn, io.ErrUnexpectedEOF
 }
 } 
} 

GPC模型总结

1 新建socket、accept的socket都设置为非阻塞
2.新建socket、accept的socket的fd都加入epoll

  1. Read、Write采用循环读写,如果返回EAGAIN,将协程状态设置为wait
  2. io线程定期执行sysmon,通过epollWait获取可读写的fd,将fd关联的协程设置为runable

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:UPDOWN_GG

查看原文:单机百万并发,golang 50行代码

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
6907 次点击
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏