最近在学习 rpc 相关的知识,如果让我去从头设计一个 rpc,我从使用者的角度出发,究竟需要去做一下什么工作?
第一,RPC 本质上就是一个远程调用,那肯定就需要通过网络来传输数据。虽然传输协议可以有多种选择,但考虑到可靠性的话,我们一般默认采用 TCP 协议。为了屏蔽网络传输的复杂性,我们需要封装一个单独的数据传输模块用来收发二进制数据,这个单独模块我们可以叫做传输模块。
第二,用户请求的时候是基于方法调用,方法出入参数都是对象数据,对象是肯定没法直接在网络中传输的,我们需要提前把它转成可传输的二进制,这就是我们说的序列化过程。但只是把方法调用参数的二进制数据传输到服务提供方是不够的,我们需要在方法调用参数的二进制数据后面增加“断句”符号来分隔出不同的请求,在两个“断句”符号中间放的内容就是我们请求的二进制数据,这个过程我们叫做协议封装。
第三,为了传输性能的进一步提高,我们还可以在协议模块中加入压缩功能,这是因为压缩过程也是对传输的二进制数据进行操作。在实际的网络传输过程中,我们的请求数据包在数据链路层可能会因为太大而被拆分成多个数据包进行传输,为了减少被拆分的次数,从而导致整个传输过程时间太长的问题,这就是数据压缩和解压。
第四,在针对服务提供方不断增多的情况,会出现一个接口对应多个IP+端口的服务地址映射关系,但这多个服务提供者对于我们的调用方来说是透明的,所以在 RPC 里面我们还需要给调用方找到所有的服务提供方,并需要在 RPC 里面维护好接口跟服务提供者地址的关系,这样调用方在发起请求的时候才能快速地找到对应的接收地址,这就是我们常说的服务发现。
第五,作为一个成熟的 rpc 框架,还需要给开发者提供各种各样能够深入介入 rpc 框架内部的接口调用。这里可以看看 grpc 各种各样的 wrapper 封装,在传输,编码,协议,服务发现,日志打印都留有标准统一的对外接口,让开发人员可以自定义开发。
在考虑完以上问题后,我对 grpc 的实现比较好奇,于是搭建了 client 和 service 想从代码的维度上去查看一下 grpc 在 rpc 上是怎么实现的?
client代码
package main import ( "context" "fmt" "log" "time" "grpc-test/client/service/pbservice" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func main() { // 1. 新建连接,端口是服务端开放的8082端口 // 并且添加grpc.WithInsecure(),不然没有证书会报错 conn, err := grpc.Dial(":8082", grpc.WithInsecure()) if err != nil { log.Fatal(err) } // 退出时关闭链接 defer conn.Close() // 2. 调用Product.pb.go中的NewProdServiceClient方法 productServiceClient := pbservice.NewProdServiceClient(conn) // 设置了超时 15s ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(10 * time.Second))) resp, err := productServiceClient.GetProductStock(ctx, &pbservice.ProductRequest{ProdId: 233}) defer cancel() if err != nil { state, ok := status.FromError(err) if ok && state.Code() == codes.DeadlineExceeded { log.Fatalln("client.Search err: deadline") } log.Fatal("调用gRPC方法错误: ", err) } fmt.Println("调用gRPC方法成功,ProdStock = ", resp.ProdStock) }
service代码
package main import ( "google.golang.org/grpc" "grpc-test/server/pbservice/product" "net" "log" ) func main() { rpcServer := grpc.NewServer() //将新建的ProdService注册进去,注意这里的 RegisterProdServiceServer 方法是将新生成的 service 中的方法 product.RegisterProdServiceServer(rpcServer, new(product.ProdService)) listener, err := net.Listen("tcp", ":8082") if err != nil { log.Fatal("服务监听端口失败", err) } // 运行rpcServer,传入listener _ = rpcServer.Serve(listener) }
接下来我们追踪一下 client 部分的代码:
# 创建连接 conn, err := grpc.Dial(":8082", grpc.WithInsecure()) if err != nil { log.Fatal(err) } # 具体的初始化操作在grpc.DialContext方法中。 # DialContext首先初始化空对象ClientConn,然后判断opts …DialOption数据是否存在,如果存在就执行传入的函数并设置特定属性。 // Dial creates a client connection to the given target. func Dial(target string, opts ...DialOption) (*ClientConn, error) { return DialContext(context.Background(), target, opts...) }
在 clientconn.go 中创建ClientConn对象,对应结构体包括的字段
// ClientConn represents a virtual connection to a conceptual endpoint, to // perform RPCs. // // A ClientConn is free to have zero or more actual connections to the endpoint // based on configuration, load, etc. It is also free to determine which actual // endpoints to use and may change it every RPC, permitting client-side load // balancing. // // A ClientConn encapsulates a range of functionality including name // resolution, TCP connection establishment (with retries and backoff) and TLS // handshakes. It also handles errors on established connections by // re-resolving the name and reconnecting. type ClientConn struct { ctx context.Context cancel context.CancelFunc target string parsedTarget resolver.target // 负载均衡选择 authority string dopts dialOptions // 初始化可设置选项,在每一次请求会带上,看call.go中的combine方法 csMgr *connectivityStateManager // 连接状态维护 balancerBuildOpts balancer.BuildOptions blockingpicker *pickerWrapper // 负载均衡设置 mu sync.RWMutex resolverWrapper *ccResolverWrapper // 实现了resolver.ClientConn,位于./resolver/resolver.go中,ClientConn的上层包装器 sc *ServiceConfig conns map[*addrConn]struct{} // 存放连接的地方 // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters curBalancerName string balancerWrapper *ccBalancerWrapper // 负载均衡器上的包装器 retryThrottler atomic.Value firstResolveEvent *grpcsync.Event channelzID int64 // channelz unique identification number czData *channelzData }
当pb自动生成的代码中,调用方法的时候,grpc 会将其请求代理替换成自己封装的网络层上请求,使用连接的 Invoke() 方法
func (c *prodServiceClient) GetProductStock(ctx context.Context, in *ProductRequest, opts ...grpc.CallOption) (*ProductResponse, error) { out := new(ProductResponse) err := c.cc.Invoke(ctx, "/service.ProdService/GetProductStock", in, out, opts...) if err != nil { return nil, err } return out, nil }
在 call.go 内部的 invoke() 方法中,grpc 主要做了三件事情:建立获取连接(newClientStream函数),发送数据(SendMSg()函数),接收数据(RecvMsg()函数)
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } if err := cs.SendMsg(req); err != nil { return err } return cs.RecvMsg(reply) }
上面的还是比较容易看懂的,但是深入到内部传输层的处理的时候会发现 grpc 很多晦涩难懂的逻辑,只能强忍着往下看了