分享
  1. 首页
  2. 文章

go-grpc-流式接口(streaming rpc)

旧梦发癫 · · 2369 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

上一篇我们介绍了rpc最基本的应用,今天我们来看看rpc的另外一个数据交互方式streaming rpc,也就是流式接口。

streaming rpc相比于simple rpc来说可以很好的解决一个接口发送大量数据的场景。

比如一个订单导出的接口有20万条记录,如果使用simple rpc来实现的话。那么我们需要一次性接收到20万记录才能进行下一步的操作。但是如果我们使用streaming rpc那么我们就可以接收一条记录处理一条记录,直到所以的数据传输完毕。这样可以较少服务器的瞬时压力,也更有及时性

下面们来看看streaming rpc具体是怎么交互的。

IDL

syntax = "proto3";
package proto;
message Order {
 int32 id = 1;
 string orderSn = 2;
 string date = 3;
}
message OrderList{
 Order order = 1;
}
message OrderSearchParams {
}
message Image{
 string fileName = 1;
 string file = 2;
}
message ImageList{
 Image image = 1;
}
message uploadResponse{
}
message SumData{
 int32 number = 1;
}
service StreamService {
 rpc OrderList(OrderSearchParams) returns (stream OrderList){}; //服务端流式
 rpc UploadFile(stream ImageList) returns (uploadResponse){}; //客户端流式
 rpc SumData(stream SumData) returns (stream SumData){}; //双向流式
}

这里定义了三个方法

  • OrderList 服务器流式,客户端普通rpc调用
  • UploadFile 客户端流式,服务端普通rpc
  • SumData 双向流式

基础结构

server
package main
import (
 "google.golang.org/grpc"
 "iris-grpc-example/proto"
 "log"
 "net"
)
type StreamServices struct {}
func main() {
 server := grpc.NewServer()
 proto.RegisterStreamServiceServer(server, &StreamServices{})
 lis, err := net.Listen("tcp", "127.0.0.1:9528")
 if err != nil {
 log.Fatalf("net.Listen err: %v", err)
 }
 server.Serve(lis)
}
func (services *StreamServices)OrderList(params *proto.OrderSearchParams,stream proto.StreamService_OrderListServer) error {
 return nil
}
func (services *StreamServices)UploadFile(stream proto.StreamService_UploadFileServer) error {
 return nil
}
func (services *StreamServices)SumData(stream proto.StreamService_SumDataServer) error {
 return nil
}
clietn
package main
import (
 "github.com/kataras/iris/v12"
 "google.golang.org/grpc"
 "iris-grpc-example/proto"
 "log"
)
var streamClient proto.StreamServiceClient
func main() {
 app := iris.New()
 app.Logger().SetLevel("debug") //debug
 app.Handle("GET", "/testOrderList", orderList)
 app.Handle("GET", "/testUploadImage", uploadImage)
 app.Handle("GET", "/testSumData", sumData)
 app.Run(iris.Addr("127.0.0.1:8080"))
}
func init() {
 connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure())
 if err != nil {
 log.Fatalln(err)
 }
 streamClient = proto.NewStreamServiceClient(connect)
}
func orderList(ctx iris.Context) {
 
}
func uploadImage(ctx iris.Context) {
}
func sumData(ctx iris.Context) {
}

按照proto中的约定,先实现接口并注册一个服务。接下来我们依次来实现三个不同的流式方法。

服务端流式 orderList
server
func (services *StreamServices) OrderList(params *proto.OrderSearchParams, stream proto.StreamService_OrderListServer) error {
 for i := 0; i <= 10; i++ {
 order := proto.Order{
 Id: int32(i),
 OrderSn: time.Now().Format("20060102150405") + "order_sn",
 Date: time.Now().Format("2006年01月02日 15:04:05"),
 }
 err := stream.Send(&proto.StreamOrderList{
 Order: &order,
 })
 if err != nil {
 return err
 }
 }
 return nil
}

gRPC为我们提供一个流的发送方法。send,这样我们可以很简单的以流的方式传递数据。
现在我们来查看streaming.pb.go中的send

func (x *streamServiceOrderListServer) Send(m *StreamOrderList) error {
 return x.ServerStream.SendMsg(m)
}

可以看到最终是使用ServerStream.SendMsg,查看源码,可以发现,最终是使用了一个结构体。

type serverStream struct {
 ctx context.Context
 ......
 maxReceiveMessageSize int
 maxSendMessageSize int
 ......
}

这里我们关心两个值,最大可接收大小,最大发送大小。而再SendMsg中也有对于的大小判断,所以发送的消息大小不是无限制的。

// TODO(dfawley): should we be checking len(data) instead?
 if len(payload) > ss.maxSendMessageSize {
 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
 }

我们可以服务端创建server的时候通过server := grpc.NewServer(grpc.MaxSendMsgSize())来指定大小,有可以在客服端创建client的时候通过connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithInsecure(),grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize()))的时候来指定,具体大家可以下来自己详细了解下配置项。

client
func orderList(ctx iris.Context) {
 stream, err := streamClient.OrderList(context.Background(), &proto.OrderSearchParams{})
 if err != nil {
 ctx.JSON(map[string]string{
 "err": err.Error(),
 })
 return
 }
 for {
 res, err := stream.Recv()
 if err == io.EOF {
 break
 }
 if err != nil {
 ctx.JSON(map[string]string{
 "err": err.Error(),
 })
 return
 }
 ctx.JSON(res)
 log.Println(res)
 }
}

这里在for循环中去读取数据,直到取到一个io.EOF的结束错误占位符。

客户端流式 uploadImage
server
func (services *StreamServices) UploadFile(stream proto.StreamService_UploadFileServer) error {
 for {
 res,err := stream.Recv()
 //接收消息结束,发送结果,并关闭
 if err == io.EOF {
 return stream.SendAndClose(&proto.UploadResponse{})
 }
 if err !=nil {
 return err
 }
 fmt.Println(res)
 }
 return nil
}

可以看到这里我们同样使用for结合stream.Recv()来接收数据流,但是这里我们多一个SendAndClose,表示服务器已经接收消息结束,并发生一个正确的响应给客户端。

client
func uploadImage(ctx iris.Context) {
 stream,err := streamClient.UploadFile(context.Background())
 if err != nil {
 ctx.JSON(map[string]string{
 "err": err.Error(),
 })
 return
 }
 for i:=1;i<=10 ; i++ {
 img := &proto.Image{FileName:"image"+strconv.Itoa(i),File:"file data"}
 images := &proto.StreamImageList{Image:img}
 err := stream.Send(images)
 if err != nil {
 ctx.JSON(map[string]string{
 "err": err.Error(),
 })
 return
 }
 }
 //发送完毕 关闭并获取服务端返回的消息
 resp, err := stream.CloseAndRecv()
 if err != nil {
 ctx.JSON(map[string]string{
 "err": err.Error(),
 })
 return
 }
 ctx.JSON(map[string]interface{}{"result": resp,"message":"success"})
 log.Println(resp)
}

而在客户端发送数据完毕的时候需要使用CloseAndRecv 需要接收服务端接收完毕的通知以及关闭当前通道。

双向流式 uploadImage
server
func (services *StreamServices) SumData(stream proto.StreamService_SumDataServer) error {
 i := 0
 for {
 err := stream.Send(&proto.StreamSumData{Number: int32(i)})
 if err != nil {
 return err
 }
 res, err := stream.Recv()
 if err == io.EOF {
 return nil
 }
 log.Printf("res:%d,i:%d,sum:%d\r\n", res.Number, i, int32(i)+res.Number)
 i++
 }
}

服务端在发送消息的同时,并接收服务端发送的消息。

client
func sumData(ctx iris.Context) {
 stream, err := streamClient.SumData(context.Background())
 if err != nil {
 ctx.JSON(map[string]string{
 "err": err.Error(),
 })
 return
 }
 for i := 1; i <= 10; i++ {
 err = stream.Send(&proto.StreamSumData{Number: int32(i)})
 if err == io.EOF {
 break
 }
 if err != nil {
 return
 }
 res, err := stream.Recv()
 if err == io.EOF {
 break
 }
 if err != nil {
 return
 }
 log.Printf("res number:%d", res.Number)
 }
 stream.CloseSend()
 return
}

上面我们可以看到。客户端有一个执行断开连接的标识CloseSend(),而服务器没有,因为服务端断开连接是隐式的,我们只需要退出循环即可断开连接。可以灵活的控制。

上面就是go-gRPC的流式接口。只是一个简单的例子。如有不妥的地方欢迎指出。感谢。

学习于-煎鱼

期待一起交流

qrcode_for_gh_60813539dc23_258.jpg


有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:旧梦发癫

查看原文:go-grpc-流式接口(streaming rpc)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
2369 次点击 ∙ 1 赞
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏