gRPC的那些事 - interceptor
smallnest · · 2704 次点击 · · 开始浏览gRPC-Go 增加了拦截器(interceptor)的功能, 就像Java Servlet中的 filter一样,可以对RPC的请求和响应进行拦截处理,而且既可以在客户端进行拦截,也可以对服务器端进行拦截。
利用拦截器,可以对gRPC进行扩展,利用社区的力量将gRPC发展壮大,也可以让开发者更灵活地处理gRPC流程中的业务逻辑。下面列出了利用拦截器实现的一些功能框架:
- Go gRPC Middleware:提供了拦截器的interceptor链式的功能,可以将多个拦截器组合成一个拦截器链,当然它还提供了其它的功能,所以以gRPC中间件命名。
- grpc-multi-interceptor: 是另一个interceptor链式功能的库,也可以将单向的或者流式的拦截器组合。
- grpc_auth: 身份验证拦截器
- grpc_ctxtags: 为上下文增加
Tagmap对象 - grpc_zap: 支持
zap日志框架 - grpc_logrus: 支持
logrus日志框架 - grpc_prometheus: 支持
prometheus - otgrpc: 支持opentracing/zipkin
- grpc_opentracing:支持opentracing/zipkin
- grpc_retry: 为客户端增加重试的功能
- grpc_validator: 为服务器端增加校验的功能
- xrequestid: 将request id 设置到context中
- go-grpc-interceptor: 解析
Accept-Language并设置到context - requestdump: 输出request/response
也有其它一些文章介绍的利用拦截器的例子,如下面的两篇文章:
Introduction to OAuth on gRPC、gRPC实践 拦截器 Interceptor
相信会有更多有趣的拦截器被贡献出来。
注意,服务器只能配置一个 unary interceptor和 stream interceptor,否则会报错,客户端也是,虽然不会报错,但是只有最后一个才起作用。 如果你想配置多个,可以使用前面提到的拦截器链或者自己实现一个。
实现拦截器麻烦吗?一点都不麻烦,相反,非常的简单。
对于服务器端的单向调用的拦截器,只需定义一个UnaryServerInterceptor方法:
1
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
对于服务器端stream调用的拦截器,只需定义一个StreamServerInterceptor方法:
1
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
方法的参数中包含了上下文,请求和stream以及要调用对象的信息。
对于客户端的单向的拦截,只需定义一个``方法:
1
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
对于客户端的stream的拦截,只需定义一个``方法:
1
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
你可以查看上面提到的一些开源的拦截器的实现,它们的实现都不是太复杂,下面我们以一个简单的例子来距离,在方法调用的前后打印一个log。
Server端的拦截器
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
package mainimport ("log""net""flag"pb "github.com/smallnest/grpc/a/pb""golang.org/x/net/context""google.golang.org/grpc""google.golang.org/grpc/reflection")var (port = flag.String("p", ":8972", "port"))type server struct{}func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {return &pb.HelloReply{Message: "Hello " + in.Name}, nil}func main() {flag.Parse()lis, err := net.Listen("tcp", *port)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer(grpc.StreamInterceptor(StreamServerInterceptor),grpc.UnaryInterceptor(UnaryServerInterceptor))pb.RegisterGreeterServer(s, &server{})reflection.Register(s)if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}}func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {log.Printf("before handling. Info: %+v", info)resp, err := handler(ctx, req)log.Printf("after handling. resp: %+v", resp)return resp, err}// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {log.Printf("before handling. Info: %+v", info)err := handler(srv, ss)log.Printf("after handling. err: %v", err)return err}
grpc.NewServer可以将拦截器作为参数传入,在提供服务的时候,我们可以看到拦截器打印出log:
12
2017/04/17 23:34:20 before handling. Info: &{Server:0x17309c8 FullMethod:/pb.Greeter/SayHello}2017/04/17 23:34:20 after handling. resp: &HelloReply{Message:Hello world,}
客户端的拦截器
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
package mainimport (//"context""flag""log""golang.org/x/net/context"pb "github.com/smallnest/grpc/a/pb""google.golang.org/grpc")var (address = flag.String("addr", "localhost:8972", "address")name = flag.String("n", "world", "name"))func main() {flag.Parse()// 连接服务器conn, err := grpc.Dial(*address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(UnaryClientInterceptor),grpc.WithStreamInterceptor(StreamClientInterceptor))if err != nil {log.Fatalf("faild to connect: %v", err)}defer conn.Close()c := pb.NewGreeterClient(conn)r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.Message)}func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {log.Printf("before invoker. method: %+v, request:%+v", method, req)err := invoker(ctx, method, req, reply, cc, opts...)log.Printf("after invoker. reply: %+v", reply)return err}func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {log.Printf("before invoker. method: %+v, StreamDesc:%+v", method, desc)clientStream, err := streamer(ctx, desc, cc, method, opts...)log.Printf("before invoker. method: %+v", method)return clientStream, err}
通过grpc.WithUnaryInterceptor、grpc.WithStreamInterceptor可以将拦截器传递给Dial做参数。在客户端调用的时候,可以查看拦截器输出的日志:
123
2017/04/17 23:34:20 before invoker. method: /pb.Greeter/SayHello, request:&HelloRequest{Name:world,}2017/04/17 23:34:20 after invoker. reply: &HelloReply{Message:Hello world,}2017/04/17 23:34:20 Greeting: Hello world
通过这个简单的例子,你可以很容易的了解拦截器的开发。unary和stream两种类型的拦截器可以根据你的gRPC server/client实现的不同,有选择的实现。
有疑问加站长微信联系(非本文作者)
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889
关注微信- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码` - 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传
收入到我管理的专栏 新建专栏
gRPC-Go 增加了拦截器(interceptor)的功能, 就像Java Servlet中的 filter一样,可以对RPC的请求和响应进行拦截处理,而且既可以在客户端进行拦截,也可以对服务器端进行拦截。
利用拦截器,可以对gRPC进行扩展,利用社区的力量将gRPC发展壮大,也可以让开发者更灵活地处理gRPC流程中的业务逻辑。下面列出了利用拦截器实现的一些功能框架:
- Go gRPC Middleware:提供了拦截器的interceptor链式的功能,可以将多个拦截器组合成一个拦截器链,当然它还提供了其它的功能,所以以gRPC中间件命名。
- grpc-multi-interceptor: 是另一个interceptor链式功能的库,也可以将单向的或者流式的拦截器组合。
- grpc_auth: 身份验证拦截器
- grpc_ctxtags: 为上下文增加
Tagmap对象 - grpc_zap: 支持
zap日志框架 - grpc_logrus: 支持
logrus日志框架 - grpc_prometheus: 支持
prometheus - otgrpc: 支持opentracing/zipkin
- grpc_opentracing:支持opentracing/zipkin
- grpc_retry: 为客户端增加重试的功能
- grpc_validator: 为服务器端增加校验的功能
- xrequestid: 将request id 设置到context中
- go-grpc-interceptor: 解析
Accept-Language并设置到context - requestdump: 输出request/response
也有其它一些文章介绍的利用拦截器的例子,如下面的两篇文章:
Introduction to OAuth on gRPC、gRPC实践 拦截器 Interceptor
相信会有更多有趣的拦截器被贡献出来。
注意,服务器只能配置一个 unary interceptor和 stream interceptor,否则会报错,客户端也是,虽然不会报错,但是只有最后一个才起作用。 如果你想配置多个,可以使用前面提到的拦截器链或者自己实现一个。
实现拦截器麻烦吗?一点都不麻烦,相反,非常的简单。
对于服务器端的单向调用的拦截器,只需定义一个UnaryServerInterceptor方法:
1
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
对于服务器端stream调用的拦截器,只需定义一个StreamServerInterceptor方法:
1
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
方法的参数中包含了上下文,请求和stream以及要调用对象的信息。
对于客户端的单向的拦截,只需定义一个``方法:
1
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
对于客户端的stream的拦截,只需定义一个``方法:
1
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
你可以查看上面提到的一些开源的拦截器的实现,它们的实现都不是太复杂,下面我们以一个简单的例子来距离,在方法调用的前后打印一个log。
Server端的拦截器
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
package mainimport ("log""net""flag"pb "github.com/smallnest/grpc/a/pb""golang.org/x/net/context""google.golang.org/grpc""google.golang.org/grpc/reflection")var (port = flag.String("p", ":8972", "port"))type server struct{}func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {return &pb.HelloReply{Message: "Hello " + in.Name}, nil}func main() {flag.Parse()lis, err := net.Listen("tcp", *port)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer(grpc.StreamInterceptor(StreamServerInterceptor),grpc.UnaryInterceptor(UnaryServerInterceptor))pb.RegisterGreeterServer(s, &server{})reflection.Register(s)if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}}func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {log.Printf("before handling. Info: %+v", info)resp, err := handler(ctx, req)log.Printf("after handling. resp: %+v", resp)return resp, err}// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {log.Printf("before handling. Info: %+v", info)err := handler(srv, ss)log.Printf("after handling. err: %v", err)return err}
grpc.NewServer可以将拦截器作为参数传入,在提供服务的时候,我们可以看到拦截器打印出log:
12
2017/04/17 23:34:20 before handling. Info: &{Server:0x17309c8 FullMethod:/pb.Greeter/SayHello}2017/04/17 23:34:20 after handling. resp: &HelloReply{Message:Hello world,}
客户端的拦截器
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
package mainimport (//"context""flag""log""golang.org/x/net/context"pb "github.com/smallnest/grpc/a/pb""google.golang.org/grpc")var (address = flag.String("addr", "localhost:8972", "address")name = flag.String("n", "world", "name"))func main() {flag.Parse()// 连接服务器conn, err := grpc.Dial(*address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(UnaryClientInterceptor),grpc.WithStreamInterceptor(StreamClientInterceptor))if err != nil {log.Fatalf("faild to connect: %v", err)}defer conn.Close()c := pb.NewGreeterClient(conn)r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.Message)}func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {log.Printf("before invoker. method: %+v, request:%+v", method, req)err := invoker(ctx, method, req, reply, cc, opts...)log.Printf("after invoker. reply: %+v", reply)return err}func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {log.Printf("before invoker. method: %+v, StreamDesc:%+v", method, desc)clientStream, err := streamer(ctx, desc, cc, method, opts...)log.Printf("before invoker. method: %+v", method)return clientStream, err}
通过grpc.WithUnaryInterceptor、grpc.WithStreamInterceptor可以将拦截器传递给Dial做参数。在客户端调用的时候,可以查看拦截器输出的日志:
123
2017/04/17 23:34:20 before invoker. method: /pb.Greeter/SayHello, request:&HelloRequest{Name:world,}2017/04/17 23:34:20 after invoker. reply: &HelloReply{Message:Hello world,}2017/04/17 23:34:20 Greeting: Hello world
通过这个简单的例子,你可以很容易的了解拦截器的开发。unary和stream两种类型的拦截器可以根据你的gRPC server/client实现的不同,有选择的实现。