分享
  1. 首页
  2. 文章

如果裸写一个goroutine pool

伯乐在线 · · 2409 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

本文作者: 伯乐在线 - legendtkl 。未经作者许可,禁止转载!
欢迎加入伯乐在线 专栏作者

引言

在上文中,我说到golang的原生http server处理client的connection的时候,每个connection起一个goroutine,这是一个相当粗暴的方法。为了感受更深一点,我们来看一下go的源码。先定义一个最简单的http server如下。

1
2
3
4
5
6
7
8
func myHandler(whttp.ResponseWriter,r *http.Request){
fmt.Fprintf(w,"Hello there!\n")
}
func main(){
http.HandleFunc("/",myHandler)// 设置访问路由
log.Fatal(http.ListenAndServe(":8080",nil))
}

从入口http.ListenAndServe函数跟进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// file: net/http/server.go
func ListenAndServe(addr string,handler Handler)error{
server:=&Server{Addr:addr,Handler:handler}
returnserver.ListenAndServe()
}
func(srv *Server)ListenAndServe()error{
addr:=srv.Addr
ifaddr==""{
addr=":http"
}
ln,err:=net.Listen("tcp",addr)
iferr!=nil{
returnerr
}
returnsrv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}
func(srv *Server)Serve(lnet.Listener)error{
deferl.Close()
...
for{
rw,e:=l.Accept()
ife!=nil{
// error handle
returne
}
tempDelay=0
c,err:=srv.newConn(rw)
iferr!=nil{
continue
}
c.setState(c.rwc,StateNew)// before Serve can return
goc.serve()
}
}

首先net.Listen负责监听网络端口,rw, e := l.Accept()则从网络端口中取出TCP连接,然后go c.server()则对每一个TCP连接起一个goroutine来处理。我还说到fasthttp这个网络框架性能要比原生的net/http性能要好,其中一个原因就是因为使用了goroutine pool。那么问题来了,如果要我们自己去实现一个goroutine pool,该怎么去实现呢?我们先来实现一个最简单的。

弱鸡版

golang中的goroutine通过go来启动,goroutine资源和临时对象池不一样,不能放回去再取出来。所以goroutine应该是一直运行着的。需要的时候就运行,不需要的时候就阻塞,这样对其他的goroutine的调度影响也不是很大。而goroutine的任务可以通过channel来传递就ok了。很简单的弱鸡版本就出来了,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Gopool(){
start:=time.Now()
wg:=new(sync.WaitGroup)
data:=make(chan int,100)
fori:=0;i<10;i++{
wg.Add(1)
go func(nint){
defer wg.Done()
for_=rangedata{
fmt.Println("goroutine:",n,i)
}
}(i)
}
fori:=0;i<10000;i++{
data<-i
}
close(data)
wg.Wait()
end:=time.Now()
fmt.Println(end.Sub(start))
}

上面的代码中还做了程序运行时间统计。作为对比,下面是一个没有使用pool的版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func Nopool(){
start:=time.Now()
wg:=new(sync.WaitGroup)
fori:=0;i<10000;i++{
wg.Add(1)
go func(nint){
defer wg.Done()
//fmt.Println("goroutine", n)
}(i)
}
wg.Wait()
end:=time.Now()
fmt.Println(end.Sub(start))
}

最后运行时间对比,使用了goroutine pool的代码运行时间约为没有使用pool的代码的2/3。当然这么测试还是略显粗糙了。我们下面使用reflect那篇文章里面介绍的go benchmark testing的方式测试,测试代码如下(去掉了很多无关代码)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
packagepool
import(
"sync"
"testing"
)
func Gopool(){
wg:=new(sync.WaitGroup)
data:=make(chan int,100)
fori:=0;i<10;i++{
wg.Add(1)
go func(nint){
defer wg.Done()
for_=rangedata{
}
}(i)
}
fori:=0;i<10000;i++{
data<-i
}
close(data)
wg.Wait()
}
func Nopool(){
wg:=new(sync.WaitGroup)
fori:=0;i<10000;i++{
wg.Add(1)
go func(nint){
defer wg.Done()
}(i)
}
wg.Wait()
}
func BenchmarkGopool(b *testing.B){
fori:=0;i<b.N;i++{
Gopool()
}
}
func BenchmarkNopool(b *testing.B){
fori:=0;i<b.N;i++{
Nopool()
}
}

最终的测试结果如下,使用了goroutine pool的代码执行时间确实更短。

1
2
3
4
5
$go test-bench='.'gopool_test.go
BenchmarkGopool-85002596750ns/op
BenchmarkNopool-85003604035ns/op
PASS

升级版

对于一个好的线程池,我们往往有更多的需求,一个最迫切的需求是能自定义goroutine运行的函数。函数无非就是函数地址和函数参数。如果要传入的函数形式不一样(形参或者返回值不一样)怎么办?一个比较简单的方法是引入反射。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
typeworkerstruct{
Funcinterface{}
Args[]reflect.Value
}
func main(){
varwg sync.WaitGroup
channels:=make(chan worker,10)
fori:=0;i<5;i++{
wg.Add(1)
go func(){
defer wg.Done()
forch:=rangechannels{
reflect.ValueOf(ch.Func).Call(ch.Args)
}
}()
}
fori:=0;i<100;i++{
wk:=worker{
Func:func(x,yint){
fmt.Println(x+y)
},
Args:[]reflect.Value{reflect.ValueOf(i),reflect.ValueOf(i)},
}
channels<-wk
}
close(channels)
wg.Wait()
}

但是引入反射又会引入性能问题。本来goroutine pool就是为了解决性能问题,然而现在又引入了新的性能问题。那么怎么办呢?闭包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
typeworkerstruct{
Func func()
}
func main(){
varwg sync.WaitGroup
channels:=make(chan worker,10)
fori:=0;i<5;i++{
wg.Add(1)
go func(){
defer wg.Done()
forch:=rangechannels{
//reflect.ValueOf(ch.Func).Call(ch.Args)
ch.Func()
}
}()
}
fori:=0;i<100;i++{
j:=i
wk:=worker{
Func:func(){
fmt.Println(j+j)
},
}
channels<-wk
}
close(channels)
wg.Wait()
}

这里值得注意的一点是golang的闭包用不好容易把自己代入坑,而理解闭包一个很关键的点就是对对象的引用而不是复制。这里只是goroutine pool 实现的一个精简版,真正实现的时候还有很多细节需要考虑,比如设置一个stop channel用来停止pool,但是goroutine pool的核心就在于这个地方。

goroutine池和CPU核的关系

那么goroutine池里面goroutine数目和核数有没有关系呢?这个其实要分情况讨论。

1.goroutine池跑不满

这也就意味着channel data里面一有数据就会被goroutine拿走,这样的话当然只能你CPU能调度的过来就行,也就是池子里的goroutine数目和CPU核数是最优的。经测试,确实是这样。

2.channel data有数据阻塞

这意思是说goroutine是不够用的,如果goroutine的运行任务不是CPU密集型的(大部分情况都不是),而只是IO阻塞,这个时候一般goroutine数目在一定范围内是越多越好,当然范围在什么地方就要具体情况具体分析了。


有疑问加站长微信联系(非本文作者)

本文来自:伯乐在线

感谢作者:伯乐在线

查看原文:如果裸写一个goroutine pool

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
2409 次点击 ∙ 1 赞
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏