分享
  1. 首页
  2. 文章

Golang 1.4 net/rpc server源码解析

毛, 剑 · · 6634 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

上一篇文章我们讲了net/rpc中client部分的实现,我本机源码安装路径在/usr/local/go,这net/rpc(golang 1.4版本)涉及到的相关代码主要有:

server.go

方法注册:

因为从client我们知道是复用的socket来实现并发调用rpc方法,我们先从方法注册来看源码部分:

// Server对象大都是保存方法存根,保证对象互斥的
type Server struct { 
 mu sync.RWMutex // protects the serviceMap 
 serviceMap map[string]*service 
 reqLock sync.Mutex // protects freeReq 
 freeReq *Request 
 respLock sync.Mutex // protects freeResp 
 freeResp *Response 
}
func NewServer() *Server { 
 return &Server{serviceMap: make(map[string]*service)} 
} 
// rpc.Register默认使用了一个Server,只对serviceMap进行了初始化 
var DefaultServer = NewServer()
// rpc的service包括方法名、方法反射,类型等
type service struct { 
 name string // name of service 
 rcvr reflect.Value // receiver of methods for the service 
 typ reflect.Type // type of the receiver 
 method map[string]*methodType // registered methods 
}
// 无论是RegisterName、Register最终都调用了register的内部方法
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
 // 保证注册服务安全,先加锁
 server.mu.Lock() 
 defer server.mu.Unlock() 
 // 如果服务为空,默认注册一个 
 if server.serviceMap == nil { 
 server.serviceMap = make(map[string]*service) 
 }
 // 获取注册服务的反射信息 
 s := new(service) 
 s.typ = reflect.TypeOf(rcvr) 
 s.rcvr = reflect.ValueOf(rcvr) 
 // 可以使用自定义名称 
 sname := reflect.Indirect(s.rcvr).Type().Name() 
 if useName { 
 sname = name 
 } 
 if sname == "" { 
 s := "rpc.Register: no service name for type " + s.typ.String() 
 log.Print(s) 
 return errors.New(s) 
 } 
 // 方法必须是暴露的,既服务名首字符大写 
 if !isExported(sname) && !useName { 
 s := "rpc.Register: type " + sname + " is not exported" 
 log.Print(s) 
 return errors.New(s) 
 } 
 // 不允许重复注册 
 if _, present := server.serviceMap[sname]; present { 
 return errors.New("rpc: service already defined: " + sname) 
 } 
 s.name = sname 
 // 开始注册rpc struct内部的方法存根 
 s.method = suitableMethods(s.typ, true) 
 // 如果struct内部一个方法也没,那么直接报错,错误信息还非常详细 
 if len(s.method) == 0 { 
 str := "" 
 // To help the user, see if a pointer receiver would work. 
 method := suitableMethods(reflect.PtrTo(s.typ), false) 
 if len(method) != 0 { 
 str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
 } else { 
 str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
 } 
 log.Print(str) 
 return errors.New(str) 
 }
 // 保存在server的serviceMap中 
 server.serviceMap[s.name] = s 
 return nil 
} 
// 上文提到了服务还需要方法存根的注册
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
 // 根据方法名创建保存内部方法map
 methods := make(map[string]*methodType)
 // 获取rpc struct内部的方法 
 for m := 0; m < typ.NumMethod(); m++ { 
 method := typ.Method(m) 
 mtype := method.Type 
 mname := method.Name 
 // 之前对这行代码觉得比较奇葩,方法是否是暴露,是看是否有PkgPath的,如果是私有方法,PkgPath显示包名 
 if method.PkgPath != "" { 
 continue 
 } 
 // 判断是否是三个参数:第一个是结构本身,第二个是参数,第三个是返回值 
 // Method needs three ins: receiver, *args, *reply. 
 if mtype.NumIn() != 3 { 
 if reportErr { 
 log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
 } 
 continue 
 } 
 // args是指针类型 
 // First arg need not be a pointer. 
 argType := mtype.In(1) 
 if !isExportedOrBuiltinType(argType) { 
 if reportErr { 
 log.Println(mname, "argument type not exported:", argType) 
 } 
 continue 
 } 
 // reply是指针类型 
 // Second arg must be a pointer. 
 replyType := mtype.In(2) 
 if replyType.Kind() != reflect.Ptr { 
 if reportErr { 
 log.Println("method", mname, "reply type not a pointer:", replyType)
 } 
 continue 
 } 
 // Reply type must be exported. 
 // reply必须是可暴露的 
 if !isExportedOrBuiltinType(replyType) { 
 if reportErr { 
 log.Println("method", mname, "reply type not exported:", replyType)
 } 
 continue 
 } 
 // Method needs one out. 
 // 必须有一个返回值,而且要是error 
 if mtype.NumOut() != 1 { 
 if reportErr { 
 log.Println("method", mname, "has wrong number of outs:", mtype.NumOut())
 } 
 continue 
 } 
 // The return type of the method must be error. 
 if returnType := mtype.Out(0); returnType != typeOfError { 
 if reportErr { 
 log.Println("method", mname, "returns", returnType.String(), "not error")
 } 
 continue 
 } 
 methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
 } 
 return methods 
}

请求调用:

方法已经被注册成功,接下来我们看看是如何客户端发送请求调用的:

func (server *Server) Accept(lis net.Listener) { 
 for { 
 conn, err := lis.Accept() 
 if err != nil { 
 log.Fatal("rpc.Serve: accept:", err.Error()) // TODO(r): exit? 
 }
 // accept连接以后,打开一个goroutine处理请求 
 go server.ServeConn(conn) 
 } 
} 
func (server *Server) ServeConn(conn io.ReadWriteCloser) { 
 buf := bufio.NewWriter(conn) 
 srv := &gobServerCodec{ 
 rwc: conn, 
 dec: gob.NewDecoder(conn), 
 enc: gob.NewEncoder(buf), 
 encBuf: buf, 
 } 
 // 根据指定的codec进行协议解析 
 server.ServeCodec(srv) 
} 
func (server *Server) ServeCodec(codec ServerCodec) { 
 sending := new(sync.Mutex) 
 for { 
 // 解析请求 
 service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
 if err != nil { 
 if debugLog && err != io.EOF { 
 log.Println("rpc:", err) 
 } 
 if !keepReading { 
 break 
 } 
 // send a response if we actually managed to read a header. 
 // 如果当前请求错误了,我们应该返回信息,然后继续处理 
 if req != nil { 
 server.sendResponse(sending, req, invalidRequest, codec, err.Error())
 server.freeRequest(req) 
 } 
 continue 
 } 
 // 因为需要继续处理后续请求,所以开一个gorutine处理rpc方法 
 go service.call(server, sending, mtype, req, argv, replyv, codec) 
 }
 // 如果连接关闭了需要释放资源 
 codec.Close() 
} 
func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mtype *methodType, req *Request, keepReading bool, err error) {
 // 解析头部,如果失败,直接返回了 
 req = server.getRequest() 
 err = codec.ReadRequestHeader(req) 
 if err != nil { 
 req = nil 
 if err == io.EOF || err == io.ErrUnexpectedEOF { 
 return 
 } 
 err = errors.New("rpc: server cannot decode request: " + err.Error()) 
 return 
 } 
 if debugLog { 
 log.Printf("rpc: [trace:%v]\n", req.Tracer) 
 } 
 // We read the header successfully. If we see an error now, 
 // we can still recover and move on to the next request. 
 keepReading = true 
 // 获取请求中xxx.xxx中.的位置 
 dot := strings.LastIndex(req.ServiceMethod, ".") 
 if dot < 0 { 
 err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
 return 
 } 
 // 拿到struct名字和方法名字 
 serviceName := req.ServiceMethod[:dot] 
 methodName := req.ServiceMethod[dot+1:] 
 // Look up the request.
 // 加读锁,获取对象 
 server.mu.RLock() 
 service = server.serviceMap[serviceName] 
 server.mu.RUnlock() 
 if service == nil { 
 err = errors.New("rpc: can't find service " + req.ServiceMethod) 
 return 
 } 
 // 获取反射类型,看见rpc中的发射其实是预先放入map中的 
 mtype = service.method[methodName] 
 if mtype == nil { 
 err = errors.New("rpc: can't find method " + req.ServiceMethod) 
 } 
 return 
}
func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
 service, mtype, req, keepReading, err = server.readRequestHeader(codec) 
 if err != nil { 
 if !keepReading { 
 return 
 } 
 // discard body 
 codec.ReadRequestBody(nil) 
 return 
 } 
 // 解析请求中的args 
 argIsValue := false // if true, need to indirect before calling. 
 if mtype.ArgType.Kind() == reflect.Ptr { 
 argv = reflect.New(mtype.ArgType.Elem()) 
 } else { 
 argv = reflect.New(mtype.ArgType) 
 argIsValue = true 
 } 
 // argv guaranteed to be a pointer now. 
 if err = codec.ReadRequestBody(argv.Interface()); err != nil { 
 return 
 } 
 if argIsValue { 
 argv = argv.Elem() 
 } 
 // 初始化reply类型 
 replyv = reflect.New(mtype.ReplyType.Elem()) 
 return 
}
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
 mtype.Lock() 
 mtype.numCalls++ 
 mtype.Unlock() 
 function := mtype.method.Func 
 // Invoke the method, providing a new value for the reply. 
 // 这里是真正调用rpc方法的地方 
 returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv}) 
 // The return value for the method is an error. 
 errInter := returnValues[0].Interface() 
 errmsg := "" 
 if errInter != nil { 
 errmsg = errInter.(error).Error() 
 } 
 // 处理返回请求了 
 server.sendResponse(sending, req, replyv.Interface(), codec, errmsg) 
 server.freeRequest(req) 
} 
func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
 resp := server.getResponse() 
 // Encode the response header 
 resp.ServiceMethod = req.ServiceMethod 
 if errmsg != "" { 
 resp.Error = errmsg 
 reply = invalidRequest 
 } 
 // 上一文提到,客户端是根据序号来定位请求的,所以需要原样返回 
 resp.Seq = req.Seq 
 sending.Lock() 
 err := codec.WriteResponse(resp, reply) 
 if debugLog && err != nil { 
 log.Println("rpc: writing response:", err) 
 } 
 sending.Unlock() 
 server.freeResponse(resp) 
} 

资源重用:

上面把大致的rpc请求都说明了,server有一个技巧是重用对象,这里使用的是链表方式处理的:

// 可以看出使用一个free list链表,来避免Request以及Response对象频繁创建,导致GC压力
func (server *Server) getRequest() *Request { 
 server.reqLock.Lock() 
 req := server.freeReq 
 if req == nil { 
 req = new(Request) 
 } else { 
 server.freeReq = req.next 
 *req = Request{} 
 } 
 server.reqLock.Unlock() 
 return req 
} 
func (server *Server) freeRequest(req *Request) { 
 server.reqLock.Lock() 
 req.next = server.freeReq 
 server.freeReq = req 
 server.reqLock.Unlock() 
} 
func (server *Server) getResponse() *Response { 
 server.respLock.Lock() 
 resp := server.freeResp 
 if resp == nil { 
 resp = new(Response) 
 } else { 
 server.freeResp = resp.next 
 *resp = Response{} 
 } 
 server.respLock.Unlock() 
 return resp 
} 
func (server *Server) freeResponse(resp *Response) { 
 server.respLock.Lock() 
 resp.next = server.freeResp 
 server.freeResp = resp 
 server.respLock.Unlock() 
} 

最后,sending这把锁的目的是避免同一个套接字快速请求中避免返回包写入乱序,因此避免一个包完整写入完毕才允许下一个返回写入套接字。通过rpc包源码解析,可以看到标准库中的核心思想还是channel+mutex实现复用对象,以及各种方式的复用,避免GC压力,在我们以后写高性能服务端可以借鉴的地方。


有疑问加站长微信联系(非本文作者)

本文来自:猎豹移动技术博客

感谢作者:毛, 剑

查看原文:Golang 1.4 net/rpc server源码解析

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
6634 次点击 ∙ 1 赞
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏