分享
  1. 首页
  2. 文章

fasthttp 的 goroutine pool 实现探究

伯乐在线 · · 2964 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

本文作者: 伯乐在线 - legendtkl 。未经作者许可,禁止转载!
欢迎加入伯乐在线 专栏作者

引言

fasthttp是一个非常优秀的web server框架,号称比官方的net/http快10倍以上。fasthttp用了很多黑魔法。俗话说,源码面前,了无秘密,我们今天通过源码来看一看她的goroutine pool的实现。

热身

fasthttp写server和原生的net/http写法上基本没有区别,这里就不举例子。直接找到入口函数,在根目录下的server.go文件中,我们从函数ListenAndServe()跟踪进去。从端口监听到处理请求的函数调用链如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func ListenAndServe(addr string,handler RequestHandler)error{
s:=&Server{
Handler:handler,
}
returns.ListenAndServe(addr)
}
// ListenAndServe serves HTTP requests from the given TCP addr.
func(s *Server)ListenAndServe(addr string)error{
ln,err:=net.Listen("tcp",addr)
iferr!=nil{
returnerr
}
returns.Serve(ln)
}
// Serve blocks until the given listener returns permanent error.
func(s *Server)Serve(ln net.Listener)error{
...
wp:=&workerPool{
WorkerFunc:s.serveConn,
MaxWorkersCount:maxWorkersCount,
LogAllErrors:s.LogAllErrors,
Logger:s.logger(),
}
wp.Start()//启动worker pool
for{
ifc,err=acceptConn(s,ln,&lastPerIPErrorTime);err!=nil{
wp.Stop()
iferr==io.EOF{
returnnil
}
returnerr
}
if!wp.Serve(c){
s.writeFastError(c,StatusServiceUnavailable,
"The connection cannot be served because Server.Concurrency limit exceeded")
c.Close()
iftime.Since(lastOverflowErrorTime)>time.Minute{
s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+
"Try increasing Server.Concurrency",maxWorkersCount)
lastOverflowErrorTime=time.Now()
}
time.Sleep(100*time.Millisecond)
}
c=nil
}
}

上面代码中workerPool就是一个线程池。相关代码在server.go文件的同级目录下的workerpool.go文件中。我们从上面代码涉及到的往下看。首先是workerPool struct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typeworkerPoolstruct{
WorkerFunc func(cnet.Conn)error
MaxWorkersCount int
LogAllErrors bool
MaxIdleWorkerDuration time.Duration
Logger Logger
lock sync.Mutex
workersCountint
mustStopbool
ready[]*workerChan
stopChchanstruct{}
workerChanPool sync.Pool
}
typeworkerChanstruct{
lastUseTime time.Time
ch chan net.Conn
}

workerPool sturct中的WorkerFunc是conn的处理函数,类似net/http包中的ServeHTTP。因为所有conn的处理都是一样的,所以WorkerFunc不需要和传入的每个conn绑定,整个worker pool共用一个。workerChanPool是sync.Pool对象池。

MaxIdleWorkerDuration是worker空闲的最长时间,超过就将worker关闭。workersCount是worker的数量。ready是可用的worker列表,也就是说所有goroutine worker是存放在一个数组里面的。这个数组模拟一个类似栈的FILO队列,也就是说我们每次使用的worker都从队列的尾部开始取。wp.Start()启动worker pool。wp.Stop()是出错处理。wp.Serve(c)是对conn进行处理的函数。我们先看一下wp.Start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func(wp *workerPool)Start(){
ifwp.stopCh!=nil{
panic("BUG: workerPool already started")
}
wp.stopCh=make(chanstruct{})
stopCh:=wp.stopCh
go func(){
varscratch[]*workerChan
for{
wp.clean(&scratch)
select{
case<-stopCh:
return
default:
time.Sleep(wp.getMaxIdleWorkerDuration())
}
}
}()
}
func(wp *workerPool)Stop(){
...
close(wp.stopCh)
wp.stopCh=nil
wp.lock.Lock()
ready:=wp.ready
fori,ch:=rangeready{
ch.ch<-nil
ready[i]=nil
}
wp.ready=ready[:0]
wp.mustStop=true
wp.lock.Unlock()
}

简单来说,wp.Start()启动了一个goroutine,负责定期清理worker pool中过期worker(过期=未使用时间超过MaxIdleWorkerDuration)。清理操作都在wp.clean()函数中完成,这里就不继续往下看了。stopCh是一个标示worker pool停止的chan。上面的for-select-stop是很常用的方式。wp.Stop()负责停止worker pool的处理工作,包括关闭stopCh,清理闲置的worker列表(这时候还有一部分worker在处理conn,待其处理完成通过判断wp.mustStop来停止)。这里需要注意的一点是做资源清理的时候,对于channel需要置nil。下面看看最重要的函数wp.Serve()

核心

下面是wp.Serve()函数的调用链。wp.Serve()负责处理来自client的每一条连接。其中比较关键的函数是wp.getCh(),她从worker pool的可用空闲worker列表尾部取出一个可用的worker。这里有几个逻辑需要注意的是:1.如果没有可用的worker(比如处理第一个conn是,worker pool还是空的)则新建;2.如果worker达到上限,则直接不处理(这个地方感觉略粗糙啊!)。go func()那几行代码就是新建worker,我们放到下面说。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func(wp *workerPool)Serve(cnet.Conn)bool{
ch:=wp.getCh()
ifch==nil{
returnfalse
}
ch.ch<-c
returntrue
}
func(wp *workerPool)getCh()*workerChan{
varch *workerChan
createWorker:=false
wp.lock.Lock()
ready:=wp.ready
n:=len(ready)-1
ifn<0{
ifwp.workersCount<wp.MaxWorkersCount{
createWorker=true
wp.workersCount++
}
}else{
ch=ready[n]
ready[n]=nil
wp.ready=ready[:n]
}
wp.lock.Unlock()
ifch==nil{
if!createWorker{
returnnil
}
vch:=wp.workerChanPool.Get()
ifvch==nil{
vch=&workerChan{
ch:make(chan net.Conn,workerChanCap),
}
}
ch=vch.(*workerChan)
go func(){
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
returnch
}

workerFunc()函数定义如下(去掉了很多不影响主线的逻辑),结合上一篇《如何裸写一个goroutine pool》,还是熟悉的配方,熟悉的味道。这里要看的wp.release()是干啥的。因为前面的wp.Serve()函数只处理一个conn,所以for循环执行一次我们就可以把worker放到空闲队列中去等待下一次conn过来,从代码中可以看出来放回果然是放到空闲队列的末尾(可算和上面呼应上了)。还有上面提到的mustStop,如果worker pool停止了,mustStop就为true,那么workerFunc就要跳出循环,也就是goroutine结束了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func(wp *workerPool)workerFunc(ch *workerChan){
varcnet.Conn
varerr error
forc=range ch.ch{
ifc==nil{
break
}
...
c=nil
if!wp.release(ch){
break
}
}
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}
func(wp *workerPool)release(ch *workerChan)bool{
ch.lastUseTime=time.Now()
wp.lock.Lock()
ifwp.mustStop{
wp.lock.Unlock()
returnfalse
}
wp.ready=append(wp.ready,ch)
wp.lock.Unlock()
returntrue
}

总结

除了fasthttp,我还看了github上其他开源且star数在100以上的goroutine pool的实现,基本核心原理都在我上一篇文章中说的那些。fasthttp的实现多了一层goroutine回收机制,不得不说确实挺巧妙。fasthttp性能这么好一定是有其原因的,源码之后再慢慢读。


有疑问加站长微信联系(非本文作者)

本文来自:伯乐在线

感谢作者:伯乐在线

查看原文:fasthttp 的 goroutine pool 实现探究

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
2964 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏