Go 语言gRPC 四种通信方式的简单使用

作者: hedeqiang

发布时间: 2022-04-19 17:27:34

上次文章简单了解了下 gRPC,并实现了一个简单模式,今天说下其他的几种模式。

gRPC有四种通信方式,分别是:简单 RPC(Unary RPC)一元RPC、服务端数据流模式 (Server-side streaming RPC)、客户端数据流模式 (Client-side streaming RPC)、双向数据流模式(Bidirectional streaming RPC)。它们主要有以下特点:

gRPC 支持定义 4 种类型的服务方法,分别是简单模式、服务端数据流模式、客户端数据流模式和双向数据流模式。

  • 简单模式、一元RPC(Simple RPC、Unary RPC)是最简单的 gRPC 模式。客户端发起一次请求,服务端响应一个数据。定义格式为 rpc SayHello (HelloRequest) returns (HelloReply) {}。

file

  • 服务端数据流模式(Server-side streaming RPC):客户端发送一个请求,服务器返回数据流响应,客户端从流中读取数据直到为空。定义格式为 rpc SayHello (HelloRequest) returns (stream HelloReply) {}。

file

  • 客户端数据流模式(Client-side streaming RPC):客户端将消息以流的方式发送给服务器,服务器全部处理完成之后返回一次响应。定义格式为 rpc SayHello (stream HelloRequest) returns (HelloReply) {}。

file

  • 双向数据流模式(Bidirectional streaming RPC):客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互 RPC 框架原理。定义格式为 rpc SayHello (stream HelloRequest) returns (stream HelloReply) {}。

file

大概四种模式我们了解了,接下来使用 Go 语言简单实现一下,这里我们采用上次的代码。保持其目录结构即可。

.
├── README.md
├── api
│   └── user
│       ├── user.pb.go
│       └── user.proto
├── client
│   └── client.go
├── go.mod
├── go.sum
└── server
    └── server.go

其实上节已经实现了一个简单模式,但是为了保持四种模式都有,所以还是简单演示一下吧

简单模式

重新创建一个 proto 文件,我们放在 api/simple/simple.proto

syntax = "proto3";
option go_package = "api/simple";

package simple;

message SimpleRequest {
  string name = 1;
}

message SimpleResponse {
  string message = 1;
}

service SimpleService {
  rpc Get(SimpleRequest) returns (SimpleResponse) {}
}

生成 go 文件

protoc --go_out=. --go_opt=paths=source_relative \
   --go-grpc_out=. --go-grpc_opt=paths=source_relative \
   api/simple/simple.proto 

接下来去编写 Server 端代码,

还是如下的步骤:

  1. 通过 net.Listen(...) 监听客户端的请求
  2. 通过 grpc.NewServer() 创建一个 gRPC Server 实例
  3. 通过 pb.RegisterUserServiceServer(s, &User{}) 将该服务注册到 gRPC 框架中
  4. 通过 s.Serve(listen) 启动 gRPC 服务

创建 server/simple.go

package main

import (
    "context"
    "fmt"
    pb "github.com/hedeqiang/grpc-demo/api/simple"
    "google.golang.org/grpc"
    "log"
    "net"
)

type Simple struct {
    Name string
    pb.UnimplementedSimpleServiceServer
}

func (s *Simple) Get(ctx context.Context, req *pb.SimpleRequest) (*pb.SimpleResponse, error) {
    name := req.GetName()
    return &pb.SimpleResponse{
        Message: "Hello " + name,
    }, nil
}

func main() {
    fmt.Println("start server")

    listen, err := net.Listen("tcp", ":8989")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterSimpleServiceServer(s, &Simple{})
    if err := s.Serve(listen); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

接下来就是创建 Client 了,client 就相对简单了,通过 grpc.Dial(xxx) 来创建连接,通过 pb.NewUserServiceClient(conn)来创建客户端对象

package main

import (
    "context"
    "fmt"
    pb "github.com/hedeqiang/grpc-demo/api/simple"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "log"
)

func main() {
    fmt.Println("client start...")
    conn, err := grpc.Dial("localhost:8989", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("fail to dial: %v", err)
    }
    defer conn.Close()
    client := pb.NewSimpleServiceClient(conn)

    response, err := client.Get(context.Background(), &pb.SimpleRequest{Name: "hedeqiang"})
    if err != nil {
        log.Fatalf("fail to call: %v", err)
    }
    fmt.Println(response.GetMessage())
}

服务端数据流模式(Server-side streaming RPC)

简单来讲服务端数据流模式发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据。

好了,我们继续来编写代码,首先是服务端。创建 proto 文件api/server_side/server_side.proto

syntax = "proto3";
option go_package = "api/server-side;server_side";

package server_side;

message GetUserRequest {
  string user_id = 1;
}

message GetUserResponse {
  string user_id = 1;
  string name = 2;
  string email = 3;
}

service ServerSide {
  rpc GetUser(GetUserRequest) returns (stream GetUserResponse) {}
}

可以看到 服务端数据流模式 是返回的是一个 stream

生成Go文件

protoc --go_out=. --go_opt=paths=source_relative \
   --go-grpc_out=. --go-grpc_opt=paths=source_relative \
   api/server_side/server_side.proto

编写服务端代码,创建文件server/server_side.go

package main

import (
    "fmt"
    pb "github.com/hedeqiang/grpc-demo/api/server_side"
    "google.golang.org/grpc"
    "log"
    "net"
    "time"
)

type ServerSide struct {
    pb.UnimplementedServerSideServer
}

func (s *ServerSide) GetUser(req *pb.GetUserRequest, stream pb.ServerSide_GetUserServer) error {
    fmt.Println("GetUser")
    for i := 0; i < 10; i++ {
        time.Sleep(time.Second)
        err := stream.Send(&pb.GetUserResponse{
            UserId: fmt.Sprintf("%d", i),
            Name:   "name" + fmt.Sprintf("%d", i),
            Email:  fmt.Sprintf("laravel_code@163.com"),
        })
        if err != nil {
            log.Fatalf("%v.Send(%v) = %v", stream, req, err)
        }
    }
    return nil
}

func main() {
    fmt.Println("start server")

    listen, err := net.Listen("tcp", ":8989")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterServerSideServer(s, &ServerSide{})
    if err := s.Serve(listen); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

关键代码讲解 GetUser 方法格式其实是固定的,我们在 定义 proto 文件的时候就已经固定了

func (s ServerSide) GetUser(req *pb.GetUserRequest, stream pb.ServerSide_GetUserServer) error {}

其次因为是流传输,那么其实就是不断调用 Send 方法去发送数据

stream.Send(xxx)

为了演示效果 我使用 time.Sleep(time.Second) 稍微休息一下。方便客户端看到具体效果,否则太快了。。

明白了服务端代码主要区别在于 GetUser 是一个流式传输,需要不断的去 Send。因此客户端同理,需要不断的接收 Recv 即可。

客户端代码 ,创建 client/server_side.go

package main

import (
    "context"
    "fmt"
    pb "github.com/hedeqiang/grpc-demo/api/server_side"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "io"
    "log"
)

func main() {
    fmt.Println("client start...")
    conn, err := grpc.Dial("localhost:8989", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("fail to dial: %v", err)
    }
    defer conn.Close()
    client := pb.NewServerSideClient(conn)

    user, err := client.GetUser(context.Background(), &pb.GetUserRequest{UserId: "1"})
    if err != nil {
        log.Fatalf("fail to get user: %v", err)
    }
    for {
        resp, err := user.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("fail to recv: %v", err)
        }
        fmt.Println(resp)
    }
}

需要注意的是,我们需要判断什么时候结束,也就是 err == io.EOF 那么我们就需要 break 掉.

客户端关键就是调用一个 Recv() 方法

接下来分别启动这两个客户端

go run server/server_side.go
go run client/server_side.go

不出意外的话,我们在客户端会间隔一秒输出信息

file

客户端数据流模式(Client-side streaming RPC)

客户端将消息以流的方式发送给服务器,服务器全部处理完成之后返回一次响应。定义格式为 rpc SayHello (stream HelloRequest) returns (HelloReply) {}。

同样继续创建一个 proto 文件,放在 api/client_side/client_side.proto

syntax = "proto3";
option go_package = "api/client;client_side";

package client_side;

message GetUserRequest {
  string user_id = 1;
}

message GetUserResponse {
  string user_id = 1;
  string name = 2;
  string email = 3;
}

service ServerSide {
  rpc GetUser(stream GetUserRequest) returns (GetUserResponse) {}
}

生成 Go 文件

 protoc --go_out=. --go_opt=paths=source_relative \
   --go-grpc_out=. --go-grpc_opt=paths=source_relative \
   api/client_side/client_side.proto 

编写服务端代码

package main

import (
    "fmt"
    pb "github.com/hedeqiang/grpc-demo/api/client_side"
    "google.golang.org/grpc"
    "io"
    "log"
    "net"
)

type ClientSide struct {
    Name string
    pb.UnimplementedClientSideServer
}

func (c *ClientSide) GetUser(stream pb.ClientSide_GetUserServer) error {
    for {
        var res pb.GetUserResponse
        res.UserId = "123"
        res.Name = "hedeqiang"
        res.Email = "laravel_code@163.com"

        recv, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&res)
        }
        if err != nil {
            log.Fatalf("%v.GetUser(_) = _, %v", c, err)
        }
        fmt.Println(recv.GetUserId())

    }
}

func main() {
    fmt.Println("start server")

    listen, err := net.Listen("tcp", ":8989")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterClientSideServer(s, &ClientSide{})
    if err := s.Serve(listen); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }

通过调用 stream.Recv() 进行接收客户端的数据,当碰到 io.EOF 则表明读取完毕,调用 stream.SendAndClose(xx) 方法将最终的结果发送给客户端。

客户端代码:

package main

import (
    "context"
    "fmt"
    pb "github.com/hedeqiang/grpc-demo/api/client_side"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "log"
    "strconv"
    "time"
)

func main() {
    fmt.Println("client start...")
    conn, err := grpc.Dial("localhost:8989", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("fail to dial: %v", err)
    }
    defer conn.Close()
    client := pb.NewClientSideClient(conn)

    user, err := client.GetUser(context.Background())
    if err != nil {
        return
    }
    if err != nil {
        log.Fatalf("fail to get user: %v", err)
    }
    i := 0
    for {
        if i > 10 {
            break
        }
        i++
        time.Sleep(time.Second)
        err := user.Send(&pb.GetUserRequest{
            UserId: strconv.Itoa(i),
        })
        fmt.Println("send:", i)
        if err != nil {
            log.Fatalf("fail to send: %v", err)
        }
    }

    resp, err := user.CloseAndRecv()
    if err != nil {
        log.Fatalf("fail to close: %v", err)
    }
    fmt.Println(resp)
}

调用 Send() 方法,不断向和后端发送数据,当发送完毕以后调用 CloseAndRecv() 方法 关闭请求拿到服务端返回的数据。

双向数据流模式(Bidirectional streaming RPC)

双向数据流模式,顾名思义是双向流,客户端以流式的方式发起请求,服务端同样以流式的方式响应请求

新建api/bidirectional/bidirectional.proto 文件

syntax = "proto3";
option go_package = "api/bidirectional;bidirectional";

package bidirectional;

message GetUserRequest {
  string user_id = 1;
}

message GetUserResponse {
  string user_id = 1;
  string name = 2;
  string email = 3;
}

service BidirectionalService {
  rpc GetUser(stream GetUserRequest) returns (stream GetUserResponse) {}
}

生成 Go 代码

protoc --go_out=. --go_opt=paths=source_relative \
  --go-grpc_out=. --go-grpc_opt=paths=source_relative \
  api/bidirectional/bidirectional.proto 

编写服务端代码 server/bidirectional.go

package main

import (
    "fmt"
    pb "github.com/hedeqiang/grpc-demo/api/bidirectional"
    "google.golang.org/grpc"
    "log"
    "net"
)

type BidirectionalService struct {
    pb.UnimplementedBidirectionalServiceServer
}

func (b *BidirectionalService) GetUser(stream pb.BidirectionalService_GetUserServer) (err error) {
    for {
        req, err := stream.Recv()
        if err != nil {
            log.Printf("stream.Recv error: %v", err)
            return err
        }
        log.Printf("stream.Recv req: %v", req)
        err = stream.Send(&pb.GetUserResponse{
            UserId: "123",
            Name:   "hedeqiang",
            Email:  "laravel_code@163.com",
        })
        if err != nil {
            log.Printf("stream.Send error: %v", err)
            return err
        }
    }

}

func main() {
    fmt.Println("start server")

    listen, err := net.Listen("tcp", ":8989")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterBidirectionalServiceServer(s, &BidirectionalService{})
    if err := s.Serve(listen); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

客户端 client/bidirectional.go

package main

import (
    "context"
    "fmt"
    pb "github.com/hedeqiang/grpc-demo/api/bidirectional"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "log"
    "strconv"
    "time"
)

func main() {
    fmt.Println("client start...")
    conn, err := grpc.Dial("localhost:8989", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("fail to dial: %v", err)
    }
    defer conn.Close()
    client := pb.NewBidirectionalServiceClient(conn)

    user, err := client.GetUser(context.Background())
    if err != nil {
        log.Fatalf("fail to get user: %v", err)
    }

    for i := 0; i < 10; i++ {
        time.Sleep(time.Second)
        err := user.Send(&pb.GetUserRequest{UserId: strconv.Itoa(i)})
        if err != nil {
            log.Fatalf("fail to send: %v", err)
        }

        resp, err := user.Recv()
        if err != nil {
            log.Fatalf("fail to recv: %v", err)
        }
        fmt.Printf("%v\n", resp)
    }

    err = user.CloseSend()
    if err != nil {
        log.Fatalf("fail to close send: %v", err)
    }
}

其实可以配合 go 、channel 进行处理 RecvSend

以上代码上传到 https://github.com/hedeqiang/learn-grpc 。可能写得不对,还望大佬们纠正。

关于极客返利

极客返利 是由我个人开发的一款网课返利、返现平台。包含 极客时间返现、拉勾教育返现、掘金小册返现、GitChat返现。目前仅包含这几个平台。后续如果有需要可以考虑其他平台。 简而言之就是:你买课,我返现。让你花更少的钱,就可以买到课程。

https://geek.laravelcode.cn

https://geek.idaka.ink

版权许可

本作品采用 知识共享署名 4.0 国际许可协议 进行许可。

转载无需与我联系,但须注明出处,注明文章来源 Go 语言gRPC 四种通信方式的简单使用