1.概述
go 源码中带了rpc框架,以相对精简的当时方式实现了rpc功能,目前源码中的rpc官方已经宣布不再添加新功能,并推荐使用grpc.
作为go标准库中rpc框架,还是有很多地方值得借鉴及学习,这里将从源码角度分析go原生rpc框架,以及分享一些在使用过程中遇到的坑.
2.server端
server端主要分为两个步骤,首先进行方法注册,通过反射处理将方法取出,并存到map中.然后是网络调用,主要是监听端口,读取数据包,解码请求
调用反射处理后的方法,将返回值编码,返回给客户端.
2.1 方法注册
2.1.1 Register
// Register publishes the receiver's methods in the DefaultServer. func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) } // RegisterName is like Register but uses the provided name for the type // instead of the receiver's concrete type. func RegisterName(name string, rcvr interface{}) error { return DefaultServer.RegisterName(name, rcvr) }
如上,方法注册的入口函数有两个,分别为Register以及RegisterName,这里interface{}通常是带方法的对象.如果想要自定义方法的接收对象,则可以使用RegisterName.
2.1.2 反射处理过程
type methodType struct { sync.Mutex // protects counters method reflect.Method //反射后的函数 ArgType reflect.Type //请求参数的反射值 ReplyType reflect.Type //返回参数的反射值 numCalls uint //调用次数 } type service struct { name string // 服务名,这里通常为register时的对象名或自定义对象名 rcvr reflect.Value // 服务的接收者的反射值 typ reflect.Type // 接收者的类型 method map[string]*methodType // 对象的所有方法的反射结果. }
反射处理过程,其实就是将对象以及对象的方法,通过反射生成上面的结构,如注册Arith.Multiply(xx,xx) error 这样的对象时,生成的结构为 map["Arith"]service, service 中ethod为 map["Multiply"]methodType.
几个关键代码如下:
生成service对象
func (server *Server) register(rcvr interface{}, name string, useName bool) error { //生成service s := new(service) s.typ = reflect.TypeOf(rcvr) s.rcvr = reflect.ValueOf(rcvr) sname := reflect.Indirect(s.rcvr).Type().Name() .... s.name = sname // 通过suitableMethods将对象的方法转换成map[string]*methodType结构 s.method = suitableMethods(s.typ, true) .... //service存储为键值对 if _, dup := server.serviceMap.LoadOrStore(sname, s); dup { return errors.New("rpc: service already defined: " + sname) } return nil }
生成 map[string] *methodType
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType { methods := make(map[string]*methodType) //通过反射,遍历所有的方法 for m := 0; m < typ.NumMethod(); m++ { method := typ.Method(m) mtype := method.Type mname := method.Name // Method must be exported. if method.PkgPath != "" { continue } // Method needs three ins: receiver, *args, *reply. if mtype.NumIn() != 3 { if reportErr { log.Println("method", mname, "has wrong number of ins:", mtype.NumIn()) } continue } //取出请求参数类型 argType := mtype.In(1) ... // 取出响应参数类型,响应参数必须为指针 replyType := mtype.In(2) if replyType.Kind() != reflect.Ptr { if reportErr { log.Println("method", mname, "reply type not a pointer:", replyType) } continue } ... // 去除函数的返回值,函数的返回值必须为error. if returnType := mtype.Out(0); returnType != typeOfError { if reportErr { log.Println("method", mname, "returns", returnType.String(), "not error") } continue } //将方法存储成key-value methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType} } return methods }
2.2 网络调用
// Request 每次rpc调用的请求的头部分 type Request struct { ServiceMethod string // 格式为: "Service.Method" Seq uint64 // 客户端生成的序列号 next *Request // server端保持的链表 } // Response 每次rpc调用的响应的头部分 type Response struct { ServiceMethod string // 对应请求部分的 ServiceMethod Seq uint64 // 对应请求部分的 Seq Error string // 错误 next *Response // server端保持的链表 }
如上,网络调用主要用到上面的两个结构体,分别是请求参数以及返回参数,通过编解码器(gob/json)实现二进制到结构体的相互转换.主要涉及到下面几个步骤:
关键代码如下:
取出请求,并得到相应函数的调用参数
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) { // Grab the request header. req = server.getRequest() //编码器读取生成请求 err = codec.ReadRequestHeader(req) if err != nil { //错误处理 ... return } keepReading = true //取出服务名以及方法名 dot := strings.LastIndex(req.ServiceMethod, ".") if dot < 0 { err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod) return } serviceName := req.ServiceMethod[:dot] methodName := req.ServiceMethod[dot+1:] //从注册时生成的map中查询出相应的方法的结构 svci, ok := server.serviceMap.Load(serviceName) if !ok { err = errors.New("rpc: can't find service " + req.ServiceMethod) return } svc = svci.(*service) //获取出方法的类型 mtype = svc.method[methodName] if mtype == nil { err = errors.New("rpc: can't find method " + req.ServiceMethod) }
循环处理,不断读取链接上的字节流,解密出请求,调用方法,编码响应,回写到客户端.
func (server *Server) ServeCodec(codec ServerCodec) { sending := new(sync.Mutex) for { //读取请求 service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) if err != nil { ... } //调用 go service.call(server, sending, mtype, req, argv, replyv, codec) } codec.Close() }
通过参数进行函数调用
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) { mtype.Lock() mtype.numCalls++ mtype.Unlock() function := mtype.method.Func // 通过反射进行函数调用 returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv}) // 返回值是不为空时,则取出错误的string errInter := returnValues[0].Interface() errmsg := "" if errInter != nil { errmsg = errInter.(error).Error() } //发送相应,并释放请求结构 server.sendResponse(sending, req, replyv.Interface(), codec, errmsg) server.freeRequest(req) }
3.client端
// 异步调用 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { } // 同步调用 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { }
// Call represents an active RPC. type Call struct { ServiceMethod string // 服务名及方法名 格式:服务.方法 Args interface{} // 函数的请求参数 (*struct). Reply interface{} // 函数的响应参数 (*struct). Error error // 方法完成后 error的状态. Done chan *Call // 方法调用结束后的channel. }
client端部分则相对要简单很多,主要提供Call以及Go两个方法,分别表示同步调用以及异步调用,但其实同步调用底层实现其实也是异步调用,调用时主要用到了Call结构,相关解释如上.
3.1 主要流程
3.2 关键代码
发送请求部分代码,每次send一次请求,均生成一个call对象,并使用seq作为key保存在map中,服务端返回时从map取出call,进行相应处理.
func (client *Client) send(call *Call) { //请求级别的锁 client.reqMutex.Lock() defer client.reqMutex.Unlock() // Register this call. client.mutex.Lock() if client.shutdown || client.closing { call.Error = ErrShutdown client.mutex.Unlock() call.done() return } //生成seq,每次调用均生成唯一的seq,在服务端相应后会通过该值进行匹配 seq := client.seq client.seq++ client.pending[seq] = call client.mutex.Unlock() // 请求并发送请求 client.request.Seq = seq client.request.ServiceMethod = call.ServiceMethod err := client.codec.WriteRequest(&client.request, call.Args) if err != nil { //发送请求错误时,将map中call对象删除. client.mutex.Lock() call = client.pending[seq] delete(client.pending, seq) client.mutex.Unlock() if call != nil { call.Error = err call.done() } } }
接收响应部分的代码,这里是一个for循环,不断读取tcp上的流,并解码成Response对象以及方法的Reply对象.
func (client *Client) input() { var err error var response Response for err == nil { response = Response{} err = client.codec.ReadResponseHeader(&response) if err != nil { break } //通过response中的 Seq获取call对象 seq := response.Seq client.mutex.Lock() call := client.pending[seq] delete(client.pending, seq) client.mutex.Unlock() switch { case call == nil: err = client.codec.ReadResponseBody(nil) if err != nil { err = errors.New("reading error body: " + err.Error()) } case response.Error != "": //服务端返回错误,直接将错误返回 call.Error = ServerError(response.Error) err = client.codec.ReadResponseBody(nil) if err != nil { err = errors.New("reading error body: " + err.Error()) } call.done() default: //通过编码器,将Resonse的body部分解码成reply对象. err = client.codec.ReadResponseBody(call.Reply) if err != nil { call.Error = errors.New("reading body " + err.Error()) } call.done() } } // 客户端退出处理 client.reqMutex.Lock() client.mutex.Lock() client.shutdown = true closing := client.closing if err == io.EOF { if closing { err = ErrShutdown } else { err = io.ErrUnexpectedEOF } } for _, call := range client.pending { call.Error = err call.done() } client.mutex.Unlock() client.reqMutex.Unlock() if debugLog && err != io.EOF && !closing { log.Println("rpc: client protocol error:", err) } }
4.一些坑
- 同步调用无法超时
由于原生rpc只提供两个方法,同步的Call以及异步的Go,同步的Call服务端不返回则会一直阻塞,这里如果存在大量的不返回,会导致协程一直无法释放.
- 异步调用超时后会内存泄漏
基于异步调用加channel实现超时功能也会存在泄漏问题,原因是client的请求会存在map结构中,Go函数退出并不会清理map的内容,因此如果server端不返回的话,map中的请求会一直存储,从而导致内存泄漏.
5. 总结
总的来说,go原生rpc算是个基础版本的rpc,代码精简,可扩展性高,但是只是实现了rpc最基本的网络通讯,像超时熔断,链接管理(保活与重连),服务注册发现,还是欠缺的,因此还是达不到生产环境开箱即用,不过git就有一个基于rpc的功能增强版本,叫rpcx,支持了大部分主流rpc的特性.
6. 参考
rpc https://golang.org/pkg/net/rpc/