Golang gRPC unary, stream, bidirectional RPC examples

eye-catch Golang

I’ve written the following article before to know how to start using gRPC in devcontainer.

In this article, we’ll learn further to know other types of gRPC functions.

You can clone my GitHub repository and try it yourself.

Sponsored links

Imported libraries

These are the libraries used in the following code. This section is omitted in each example.

Server

package server

import (
    "bufio"
    "context"
    "errors"
    "fmt"
    "io"
    "log"
    "math/rand"
    "os"
    "path/filepath"
    "time"

    rpc "apitest/internal/proto"

    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/timestamppb"
)

const (
    Bit64        = 64
    DEC          = 10
    resourcePath = "./internal/server/resources/"
)

Client

package client

import (
    "bufio"
    "context"
    "errors"
    "io"
    "log"
    "math/rand"
    "os"
    "path/filepath"
    "strings"
    "time"

    rpc "apitest/internal/proto"

    "apitest/internal/common"

    "google.golang.org/grpc"
)

const (
    resourcePath = "./internal/client/resources/"
)

Proto file

syntax = "proto3";

import "google/protobuf/timestamp.proto";

option go_package = "api-test/grpc/apitest";
Sponsored links

Unary RPC

We’ll add a SayHello function that requires a parameter.

Definition in a proto file

Let’s add SayHello to Middle service. The request/response parameter has respectively only one property.

service Middle {
  // Unary RPC
  rpc Ping(PingRequest) returns (PingResponse) {}
  // Unary RPC
  rpc SayHello(HelloRequest) returns (HelloResponse) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloResponse {
  string message = 1;
}

Server implementation

What we do is to generate a message with the specified name. But we always have to create a new response instance by using the predefined struct HellowResponse.

type GrpcCallHandler struct {
    rpc.UnimplementedMiddleServer
}

func (s *GrpcCallHandler) SayHello(ctx context.Context, req *rpc.HelloRequest) (*rpc.HelloResponse, error) {
    response := rpc.HelloResponse{
        Message: fmt.Sprintf("Hello %s", req.GetName()),
    }

    return &response, nil
}

Set error if it needs to notify an error.

Client implementation

The client-side always has to pass context to call a RPC function. I recommend setting a reasonable timeout for the call, otherwise, gRPC library might tell you the connection lost 15 minutes later in some environment.

type MiddleMan struct {
    conn *grpc.ClientConn
}

func NewMiddleMan(conn *grpc.ClientConn) *MiddleMan {
    return &MiddleMan{
        conn: conn,
    }
}

func (m *MiddleMan) Greet(ctx context.Context, name string) {
    timeoutCtx, cancel := context.WithTimeout(ctx, time.Second)
    defer cancel()

    client := rpc.NewMiddleClient(m.conn)
    res, err := client.SayHello(timeoutCtx, &rpc.HelloRequest{Name: name})
    if err != nil {
        log.Printf("[ERROR] could not greet: %v\n", err)
        return
    }

    log.Printf("Greeting: %s\n", res.GetMessage())
}

A unary function is a blocking call that can be used in the same way as a normal function call. The only difference is that the return value is always wrapped with a struct. Remember that the request/response parameters are defined in a message keyword in a proto file.

Since the property could be nil, it would be better to use GetXxxx function instead of accessing Message property directly.

The Greet function is called in a main function here.

func main() {
    grpcConn, err := grpc.Dial(
        serverHost,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }

    defer grpcConn.Close()

    middleMan := client.NewMiddleMan(grpcConn)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    middleMan.Greet(ctx, "Yuto")
}

Result

Start the server first.

$ make runServer 
2023/05/24 05:01:44 start gRPC server

Then start the client in a different terminal.

$ make runClient 
2023/05/24 05:02:18 Greeting: Hello Yuto

The expected message is returned.

Server streaming RPC to Download a file

Server streaming RPC can be used if the server needs to send data continuously to a client but a client doesn’t need to send the response. Downloading a file is a good example.

Definition in a proto file

stream keyword needs to be added to the return value for a server streaming RPC. Other things are the same as Unary RPC.

service Middle {
  // Unary RPC
  rpc Ping(PingRequest) returns (PingResponse) {}
  // Unary RPC
  rpc SayHello(HelloRequest) returns (HelloResponse) {}
  // Server Streaming RPC
  rpc Download(DownloadRequest) returns (stream DownloadResponse) {}
}

message DownloadRequest {
  string filename = 1;
}

message DownloadResponse {
  string line = 1;
}

To restrict the download path, a client has to send only a filename. The server sends one line by one line, so the response is a single-line string. A client-side needs to concatenate it to generate the completed file context.

Server implementation

To download a file from a fixed folder, an absolute path needs to be generated first. Once a file is opened, it sends the content one line by one line.

const (
    resourcePath = "./internal/server/resources/"
)

func (s *GrpcCallHandler) Download(
    req *rpc.DownloadRequest,
    stream rpc.Middle_DownloadServer,
) error {
    absPath, err := filepath.Abs(filepath.Join(resourcePath, req.Filename))
    if err != nil {
        return fmt.Errorf("failed to get absolute path: %w", err)
    }

    file, err := os.Open(absPath)
    if err != nil {
        log.Println(absPath)
        return fmt.Errorf("failed to open the file: %w", err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        stream.Send(&rpc.DownloadResponse{Line: scanner.Text()})
        <-time.After(time.Second)
    }
    return nil
}

As you can see above, an error can be returned in a normal way.

Client implementation

The implementation until the function call is the same as the unary RPC function.

// Download(ctx context.Context, in *DownloadRequest, opts ...grpc.CallOption) (Middle_DownloadClient, error)
func (m *MiddleMan) Download(ctx context.Context, filename string) {
    timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    client := rpc.NewMiddleClient(m.conn)
    // log.Printf("start receiving a file [%s]", name)

    stream, err := client.Download(timeoutCtx, &rpc.DownloadRequest{Filename: filename})
    if err != nil {
        log.Printf("[ERROR] failed to create a stream for Download: %v\n", err)
        return
    }

    lines := []string{}
    for {
        res, err := stream.Recv()
        if errors.Is(err, io.EOF) {
            break
        }

        if err != nil {
            log.Printf("[ERROR] failed to receive data for [%s]: %v\n", filename, err)
            break
        }
        log.Println(res.GetLine())
        lines = append(lines, res.GetLine())
    }

    log.Printf("File content is as follows\n%s\n", strings.Join(lines, "\n"))
}

The return value of the function call is stream instead of a response. Recv() function needs to be called to get the next response. When the server side finishes the process, io.EOF is returned to the error variable.

Let’s call the function twice. A filename doesn’t exist in the server.

func main() {
    grpcConn, err := grpc.Dial(
        serverHost,
        grpc.WithTransportCredentials(insecure.NewCredentials())
    )
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }

    defer grpcConn.Close()

    middleMan := client.NewMiddleMan(grpcConn)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go middleMan.Download(ctx, "test_file.txt")
    middleMan.Download(ctx, "not_exist.txt")

    // exit by ctrl + c
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt)
    <-quit
    log.Println("exit")
}

Result

Server

$ make runServer 
2023/06/03 06:44:00 start gRPC server
2023/06/03 06:44:05 /workspaces/api-test/languages/go/internal/server/resources/not_exist.txt

Client

$ make runClient 
2023/06/03 06:44:05 aaaaaa
2023/06/03 06:44:05 [ERROR] failed to receive data for [not_exist.txt]: rpc error: code = Unknown desc = failed to open the file: open /workspaces/api-test/languages/go/internal/server/resources/not_exist.txt: no such file or directory
2023/06/03 06:44:05 File content is as follows

2023/06/03 06:44:06 bbbbbb
2023/06/03 06:44:07 cccccc
2023/06/03 06:44:08 File content is as follows
aaaaaa
bbbbbb
cccccc

The error is returned for a non-existing file. On the other hand, a single line string is returned every second for the existing file.

Client streaming RPC Upload a file

In opposite to server streaming RPC, uploading a file to a server is a good example. A client continuously sends data to the server side without any response from the server.

Definition in a proto file

Add stream keyword to the request parameter.

service Middle {
  // Unary RPC
  rpc Ping(PingRequest) returns (PingResponse) {}
  // Unary RPC
  rpc SayHello(HelloRequest) returns (HelloResponse) {}
  // Server Streaming RPC
  rpc Download(DownloadRequest) returns (stream DownloadResponse) {}
  // Client Streaming RPC
  rpc Upload(stream UploadRequest) returns (UploadResponse) {}
}

message UploadRequest {
  string filename = 1;
  bytes chunk = 2;
}

message UploadResponse {
  bool result = 1;
  int64 writtenSize = 2;
  string message = 3;
}

If it’s a text file, data type can be string but it might be byte coded file. Therefore, we define bytes here.

Server implementation

The implementation is a bit long due to the error handling.

func (s *GrpcCallHandler) Upload(stream rpc.Middle_UploadServer) error {
    writtenSize := 0
    var file *os.File

    log.Println("Upload was triggered")

    for {
        res, err := stream.Recv()
        if err != nil {
            if errors.Is(err, io.EOF) {
                return stream.SendAndClose(&rpc.UploadResponse{
                    Result:      true,
                    WrittenSize: int64(writtenSize),
                    Message:     "COMPLETED",
                })
            }

            return status.Errorf(codes.Unknown, "[ERROR] failed to upload: %w\n", err)
        }

        if file == nil {
            if res.GetFilename() == "" {
                return status.Errorf(codes.InvalidArgument, "filename must be specified")
            }

            absPath, err := filepath.Abs(filepath.Join(resourcePath, "from_client", res.GetFilename()))
            if err != nil {
                errorMsg := fmt.Sprintf("failed to get absolute path: %v", err)
                return status.Errorf(codes.Internal, errorMsg)
            }

            file, err = os.Create(absPath)
            if err != nil {
                errorMsg := fmt.Sprintf("failed to create a file: %v", err)
                return status.Errorf(codes.PermissionDenied, errorMsg)
            }
            defer file.Close()
        } else {
            if len(res.GetChunk()) > 0 {
                log.Printf("received: %s\n", string(res.GetChunk()))

                length, err := file.Write(res.GetChunk())
                if err != nil {
                    errorMsg := fmt.Sprintf("failed to write chunk: %v", err)
                    return status.Errorf(codes.Internal, errorMsg)
                }

                writtenSize += length
            }
        }
    }
}

The important thing here is that an error can’t be returned in a normal way. If an error needs to be returned, status.Errorf() needs to be used. In other way, we can define the error code and the error text in the response message.

Client implementation

It opens a file and sends the data to the server.

func (m *MiddleMan) Upload(ctx context.Context, filename string) {
    timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()

    absPath, err := filepath.Abs(filepath.Join(resourcePath, filename))
    if err != nil {
        log.Printf("[ERROR] failed to get absolute path: %v", err)
        return
    }

    file, err := os.Open(absPath)
    if err != nil {
        log.Printf("failed to open the file: %v", err)
        return
    }
    defer file.Close()

    client := rpc.NewMiddleClient(m.conn)

    stream, err := client.Upload(timeoutCtx)
    if err != nil {
        log.Printf("[ERROR] failed to create a stream for Upload: %v\n", err)
        return
    }

    log.Printf("start to upload file [%s]\n", filename)
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        chunk := scanner.Text()
        log.Printf("send chunk: %s", chunk)
        if err := stream.Send(&rpc.UploadRequest{Filename: filename, Chunk: scanner.Bytes()}); err != nil {
            log.Printf("[ERROR] failed to send data: %v", err)

            common.ShowErrorMessageInTrailer(stream)

            break
        }
        <-time.After(time.Second)
    }

    res, err := stream.CloseAndRecv()
    if err != nil {
        log.Printf("failed to close: %v\n", err)
    }

    log.Printf("completed to upload file [%s]\nresult: %t\nwrittenSize: %d\nmessage: %s\n",
        filename, res.GetResult(), res.GetWrittenSize(), res.GetMessage())
}

Use Send() method to send data to the server. When all the file contents are sent to the server, we need to close the connection. CloseAndRecv() should be used to release all the resources.

If an error is met, we need to show the error. The server side returns the error by status.Errorf(). The client-side can show it in the following way.

package common

import (
    "log"

    "google.golang.org/grpc"
)

func ShowErrorMessageInTrailer(stream grpc.ClientStream) {
    trailer := stream.Trailer()
    v, exist := trailer["error"]
    if exist { // there is an error
        log.Println("Error: ", v)
    }
}

Call the Upload function.

func main() {
    grpcConn, err := grpc.Dial(
        serverHost,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }

    defer grpcConn.Close()

    middleMan := client.NewMiddleMan(grpcConn)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    middleMan.Upload(ctx, "data.txt")

    // exit by ctrl + c
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt)
    <-quit
    log.Println("exit")
}

Result A successful case

Server

$ make runServer
2023/06/03 07:12:07 start gRPC server
2023/06/03 07:22:38 Upload was triggered
2023/06/03 07:22:39 received: second line
2023/06/03 07:22:40 received: third line
2023/06/03 07:22:41 received: fourth line
2023/06/03 07:22:42 received: and so on...
2023/06/03 07:22:44 received: end line

Client

$ make runClient
2023/06/03 07:22:38 start to upload file [data.txt]
2023/06/03 07:22:38 send chunk: first line
2023/06/03 07:22:39 send chunk: second line
2023/06/03 07:22:40 send chunk: third line
2023/06/03 07:22:41 send chunk: fourth line
2023/06/03 07:22:42 send chunk: and so on...
2023/06/03 07:22:43 send chunk: 
2023/06/03 07:22:44 send chunk: end line
2023/06/03 07:22:45 completed to upload file [data.txt]
result: true
writtenSize: 52
message: COMPLETED

Result An error case

To simulate an error case, set an empty string to Filename on the client side.

for scanner.Scan() {
    chunk := scanner.Text()
    log.Printf("send chunk: %s", chunk)
    // set an empty string to Filename
    if err := stream.Send(&rpc.UploadRequest{Filename: "", Chunk: scanner.Bytes()}); err != nil {
        log.Printf("[ERROR] failed to send data: %v", err)

The result becomes as follows.

Server

$ make runServer
2023/06/03 07:28:37 start gRPC server
2023/06/03 07:28:41 Upload was triggered

Client

$ make runClient
2023/06/03 07:28:41 start to upload file [data.txt]
2023/06/03 07:28:41 send chunk: first line
2023/06/03 07:28:42 send chunk: second line
2023/06/03 07:28:42 [ERROR] failed to send data: EOF
2023/06/03 07:28:42 failed to close: rpc error: code = InvalidArgument desc = filename must be specified
2023/06/03 07:28:42 completed to upload file [data.txt]
result: false
writtenSize: 0
message: 

The error is correctly notified.

Bidirectional streaming RPC

Bidirectional streaming RPC is used when a client and a server need to communicate with each other depending on the response.

Definition in a proto file

Add stream keyword for both request/response parameters.

service Middle {
  // Unary RPC
  rpc Ping(PingRequest) returns (PingResponse) {}
  // Unary RPC
  rpc SayHello(HelloRequest) returns (HelloResponse) {}
  // Server Streaming RPC
  rpc Download(DownloadRequest) returns (stream DownloadResponse) {}
  // Client Streaming RPC
  rpc Upload(stream UploadRequest) returns (UploadResponse) {}
  // Bidirectional Streaming RPC
  rpc Communicate(stream CommunicateRequest)
      returns (stream CommunicateResponse) {}
}

message CommunicateRequest {
  int64 max = 1;
  int64 value = 2;
}

message CommunicateResponse {
  int64 currentCount = 1;
  int64 value = 2;
}

We will implement a simple logic. A client sends the first number. A server generates a random number and returns the sum of the two numbers. A client adds a random number to the sum and sends it to the server again. It repeats the process max value’s times.

Server implementation

The server has 3 steps.

  1. Generate a random number
  2. Calculate the sum and send it to a client
  3. Receive a response from a client
func (s *GrpcCallHandler) Communicate(stream rpc.Middle_CommunicateServer) error {
    ctx := stream.Context()

    res, err := stream.Recv()
    if err != nil {
        return status.Errorf(codes.Unknown, "[ERROR] failed to communicate: %w\n", err)
    }

    maxCount := res.GetMax()
    if maxCount == 0 {
        maxCount = 3
    }
    receivedValue := res.GetValue()

    for currentCount := 0; currentCount < int(maxCount); currentCount++ {
        // 1. generate random number
        randomValue := rand.Intn(100)
        if randomValue >= 80 {
            return status.Errorf(codes.Internal, "[ERROR] random value is too big. Value was [%d]", randomValue)
        }

        // 2. calculate the sum and send it to a client
        sum := receivedValue + int64(randomValue)
        err = stream.Send(&rpc.CommunicateResponse{
            CurrentCount: int64(currentCount),
            Value:        sum,
        })

        if err != nil {
            return status.Errorf(codes.Unknown, "[ERROR] failed to send: %w\n", err)
        }
        log.Printf("send value (%d): %d + %d = %d", currentCount+1, receivedValue, randomValue, sum)

        // 3. receive a response from a client
        select {
        case <-ctx.Done():
            return status.Error(codes.DeadlineExceeded, "[ERROR] communication ends")
        case <-time.After(time.Second):
            res, err := stream.Recv()
            if err != nil {
                return status.Errorf(codes.Unknown, "[ERROR] failed to receive: %w\n", err)
            }
            receivedValue = res.GetValue()
        }
    }

    log.Println("Communicatiopn ends")
    return nil
}

In step 3, it returns an error if a client doesn’t send the response in a second.

Client implementation

The client has 4 steps.

  1. Initialization
  2. Receive the sum from the server
  3. Send the next value
  4. Close the connection
func (m *MiddleMan) Communicate(ctx context.Context, maxCount int64) {
    timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Second)
    defer cancel()

    client := rpc.NewMiddleClient(m.conn)
    stream, err := client.Communicate(timeoutCtx)
    if err != nil {
        log.Printf("[ERROR] failed to create a stream: %v\n", err)
        return
    }

    // 1. Initialization
    err = stream.Send(&rpc.CommunicateRequest{
        Max:   maxCount,
        Value: 5,
    })
    if err != nil {
        log.Printf("[ERROR] failed to send initial values:%v", err)
        return
    }

    for count := 0; ; count++ {
        // 2. Receive the sum from server
        res, err := stream.Recv()
        if err != nil {
            common.ShowErrorMessageInTrailer(stream) // not show anything
            if errors.Is(err, io.EOF) {
                break
            }

            log.Printf("[ERROR] failed to receive: %v\n", err)
            return
        }

        log.Printf("received value (%d): %d", count+1, res.GetValue())

        // 3. Send the next value
        randomValue := rand.Intn(10)
        err = stream.Send(&rpc.CommunicateRequest{
            Value: res.GetValue() + int64(randomValue),
        })
        if err != nil {
            if errors.Is(err, io.EOF) {
                _, err := stream.Recv()
                log.Printf("[ERROR] failed to receive 2: %v\n", err)
                common.ShowErrorMessageInTrailer(stream) // not show anything
                break
            }

            // when the connection is lost...
            // [ERROR] failed to receive: rpc error: code = Unavailable desc = error reading from server: EOF
            log.Printf("[ERROR] failed to send value:%v", err)
            return
        }
    }

    // 4. Close the connection
    err = stream.CloseSend()
    if err != nil {
        log.Printf("[ERROR] failed to close and send: %v", err)
    } else {
        log.Println("Communication ends")
    }
}

The error is not shown by ShowErrorMessageInTrailer() function but it can be handled in a normal way.

The connection might be lost during the communication. In this case, the error isn’t caught by errors.Is(err, io.EOF) for some reason even though the error shows EOF…

Considering connection lost

We must consider the connection lost. It can take max 15 minutes until gRPC client gives up communicating with a gRPC server when the connection between the server and the client is lost. But it seems to depend on the environment. It’s better to test it first. Even though this application is running in a Docker container, the behavior is different depending on the system. I tested it with Windows and Linux. On Windows, it reports the connection error soon but it took 15 min on Linux. If you have the same problem, it might be worth setting WithKeepaliveParams in a reasonable duration.

func main() {
    grpcConn, err := grpc.Dial(
        serverHost,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        // Add this parameter
        grpc.WithKeepaliveParams(keepalive.ClientParameters{
            Time: time.Second * 10,
        }),
    )
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }

    defer grpcConn.Close()

    middleMan := client.NewMiddleMan(grpcConn)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    middleMan.Communicate(ctx, 4)

    // exit by ctrl + c
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt)
    <-quit
    log.Println("exit")
}

Result A successfull case

Server

$ make runServer
2023/06/03 08:13:24 start gRPC server
2023/06/03 08:13:56 send value (1): 5 + 37 = 42
2023/06/03 08:13:57 send value (2): 44 + 77 = 121
2023/06/03 08:13:58 send value (3): 121 + 70 = 191
2023/06/03 08:13:59 send value (4): 194 + 31 = 225
2023/06/03 08:14:00 Communicatiopn ends

Client

$ make runClient
2023/06/03 08:13:56 received value (1): 42
2023/06/03 08:13:57 received value (2): 121
2023/06/03 08:13:58 received value (3): 191
2023/06/03 08:13:59 received value (4): 225
2023/06/03 08:14:00 Communication ends

Result A error case

Server

$ make runServer
2023/06/03 07:57:35 start gRPC server
2023/06/03 07:57:56 send value (1): 5 + 15 = 20
2023/06/03 07:57:57 send value (2): 26 + 28 = 54

Client

$ make runClient
2023/06/03 07:57:56 received value (1): 20
2023/06/03 07:57:57 received value (2): 54
2023/06/03 07:57:58 [ERROR] failed to receive: rpc error: code = Internal desc = [ERROR] random value is too big. Value was [63]

Comments

Copied title and URL