Dart gRPC Bidirectional streaming example

eye-catch Dart and Flutter

I posted the following gRPC articles before. If you have something unknown in this post, please go to the following pages too.

You can clone my GitHub repository if you want to try it yourself.

Sponsored links

Proto file definition

The server streaming and the client streaming function have only one stream keyword but the bidirectional streaming function has 2 for both the parameter and the return type.

syntax = "proto3";

import "google/protobuf/timestamp.proto";

option go_package = "api-test/grpc/apitest";

service Middle {
  // Unary RPC
  rpc Ping(PingRequest) returns (PingResponse) {}
  // Unary RPC
  rpc SayHello(HelloRequest) returns (HelloResponse) {}
  // Server Streaming RPC
  rpc Download(DownloadRequest) returns (stream DownloadResponse) {}
  // Client Streaming RPC
  rpc Upload(stream UploadRequest) returns (UploadResponse) {}
  // Bidirectional Streaming RPC
  rpc Communicate(stream CommunicateRequest)
      returns (stream CommunicateResponse) {}
}

message CommunicateRequest {
  int64 max = 1;
  int64 value = 2;
}

message CommunicateResponse {
  int64 currentCount = 1;
  int64 value = 2;
}

Communicate function is the target function in this post. The communication starts from the client side. Both side generates a random value and add it to the received value, then sends the sum to the other side.

Sponsored links

Server implementation

The implementation is basically a mix of both a client streaming and a server streaming function.

@override
Stream<rpc.CommunicateResponse> communicate(
    ServiceCall call, Stream<rpc.CommunicateRequest> request) async* {
  var isFirst = true;
  var maxCount = 0;
  var currentCount = 0;

  print("communicate was triggered");

  try {
    await for (final req in request) {
      if (isFirst) {
        isFirst = false;
        maxCount = req.max.toInt();
        if (maxCount <= 0) {
          maxCount = 3;
        }
      }
      currentCount++;
      if (currentCount > maxCount) {
        break;
      }

      print("received($currentCount): ${req.value}");

      final randomValue = Random().nextInt(100);
      if (randomValue >= 80) {
        throw Exception("generated random value was bigger than 80");
      }
      print("generated($currentCount): $randomValue");

      final sum = req.value + randomValue;
      final response = rpc.CommunicateResponse();
      response.currentCount = $fixnum.Int64(currentCount);
      response.value = sum;
      yield response;
    }
  } on GrpcError catch (e) {
    print("caught an GrpcError in upload: $e");
  } catch (e) {
    print("caught an error in communicate: $e");
    call.sendTrailers(status: StatusCode.aborted, message: e.toString());

    rethrow;
  } finally {
    if (call.isCanceled) {
      print("communication was canceled");
    }
  }
  print("communicate completed");
}

The return value is Stream type of response. To make the function stream, async keyword is needed with an asterisk. It changes from Future to Stream data type.

@override
Stream<rpc.CommunicateResponse> communicate(
    ServiceCall call, Stream<rpc.CommunicateRequest> request) async* {

With async* keyword, we can use await for loop against the request stream.

await for (final req in request) {

The main process needs to be done in await for loop. The first part is the initialization step done only when it receives the first message.

if (isFirst) {
  isFirst = false;
  maxCount = req.max.toInt();
  if (maxCount <= 0) {
    maxCount = 3;
  }
}

The communication is done multiple times when it reaches to the max count. To make it work, it increments the count variable.

currentCount++;
if (currentCount > maxCount) {
  break;
}

We need an error handling in the example because a production code must have such an error handling. Let’s throw an error when the random value is bigger than 80. In case of an error, call.sendTrailers() must be called with the error code and the message. Once it’s called, the connection will be closed.

try{
    await for (final req in request) {
      ...

      final randomValue = Random().nextInt(100);
      if (randomValue >= 80) {
        throw Exception("generated random value was bigger than 80");
      }

      ...
    }
  } on GrpcError catch (e) {
    print("caught an GrpcError in upload: $e");
  } catch (e) {
    call.sendTrailers(status: StatusCode.aborted, message: e.message);
    ...
  }
}

It’s called in catch block but we can instead call it in the place where the exception is thrown. However, call return not to execute the following statements in this case.

final randomValue = Random().nextInt(100);
if (randomValue >= 80) {
  call.sendTrailers(
    status: StatusCode.aborted,
    message: "generated random value was bigger than 80",
  );
  return;
}

When it’s ready to send data back to the client, set the values to the response object and send it.

final sum = req.value + randomValue;
final response = rpc.CommunicateResponse();
response.currentCount = $fixnum.Int64(currentCount);
response.value = sum;
yield response;

Don’t forget to release used resources if necessary. The function process could be canceled by the client side. We can detect the cancel in finally block with call.isCanceled property. There is no resource to release in this example so it just prints a message.

} finally {
  if (call.isCanceled) {
    print("communication was canceled");
  }
}

Template to implement server for bidirectional streaming function

The template of the simple implementation looks like the following.

@override
Stream<rpc.XxxxResponse> nameOfFunction(
    ServiceCall call, Stream<rpc.XxxxRequest> request) async* { 
  try {
    await for (final req in request) {
      // do somthing

      // send data to a client
      yield response;
    }
  } catch (e) {
    // error handling for uncaught exception
    // set error message to notify it to the client
    call.sendTrailers(
      status: StatusCode.aborted,
      message: e.message,
    );
  } finally {
    if (call.isCanceled) {
      // Release resources if necessary
    }
  }
}

Client implementation

Let’s implement the client side. It does basically the same as the server side.

Future<void> communicate() async {
  print("--- communicate ---");
  try {
    var currentValue = $fixnum.Int64(3);
    final requestStream = StreamController<rpc.CommunicateRequest>();

    final response = client.communicate(
      requestStream.stream.map((request) => request),
      options: CallOptions(timeout: Duration(seconds: 3)),
    );

    final req = rpc.CommunicateRequest();
    req.max = $fixnum.Int64(5);
    req.value = $fixnum.Int64(3);
    requestStream.add(req);

    var count = 0;
    try {
      await for (final res in response) {
        print("received(${++count}): ${res.value}");
        final req = rpc.CommunicateRequest();
        currentValue = res.value;
        final randomValue = Random().nextInt(10);
        if (randomValue > 8) {
          throw Exception("generated random value was bigger than 8");
        }
        req.value = currentValue + randomValue;
        requestStream.add(req);
      }
    } catch (e) {
      rethrow;
    } finally {
      await response.cancel();
    }

    print("communicate completed\n");
  } on GrpcError catch (e) {
    print("caught an GrpcError: $e");
  } catch (e) {
    print("caught an error: $e");
  }
}

The difference between a bidirectional streaming and a client streaming function is this one. We need to send data multiple times but the data needs to be sent after the client receives data from the server side. The timing somehow needs to be controlled. We can send a message to the server side when requestStream.add() is called.

final requestStream = StreamController<rpc.CommunicateRequest>();

We need a response stream. The first parameter must be a stream. Therefore, we’ll convert the prepared StreamController.

final response = client.communicate(
  requestStream.stream.map((request) => request),
  options: CallOptions(timeout: Duration(seconds: 3)),
);

The callback in the map method is called whenever new data is added to requestStream by requestStream.add(). The following message is handled as the first request.

final req = rpc.CommunicateRequest();
req.max = $fixnum.Int64(5);
req.value = $fixnum.Int64(3);
requestStream.add(req);

After sending the first message, it needs to communicate with the server. The response can be read by the await for loop. It handles the response and sends a message again to the server by calling requestStream.add(). Once it sends the data, it waits for the response at await for statement.

await for (final res in response) {
  print("received(${++count}): ${res.value}");
  final req = rpc.CommunicateRequest();
  currentValue = res.value;
  final randomValue = Random().nextInt(10);
  if (randomValue > 8) {
    throw Exception("generated random value was bigger than 8");
  }
  req.value = currentValue + randomValue;
  requestStream.add(req);
}

I guess the function is automatically canceled if the response stream is no longer used but it would be better to call cancel method to make sure that the function isn’t running anymore.

try {
  // something
} catch (e) {
  rethrow;
} finally {
  await response.cancel();
}

Template to implement client for bidirectional streaming function

If your function needs to send a request after it receives a response, you can use the following template to implement your own function.

Future<void> runXxxx() async {
  try {
    final requestStream = StreamController<rpc.XxxxRequest>();

    final response = client.xxxx(
      requestStream.stream.map((request) => request),
      options: CallOptions(timeout: Duration(seconds: 3)),
    );

    final req = rpc.XxxxRequest();
    // set initial value here
    requestStream.add(req);

    try {
      await for (final res in response) {
        // do something with the response

        final req = rpc.XxxxRequest();
        // generate a new request and send it
        requestStream.add(req);
      }
    } catch (e) {
      rethrow;
    } finally {
      await response.cancel();
    }
  } catch (e) {
    // error handling for uncaught exception
    // set error message to notify it to the client
    call.sendTrailers(
      status: StatusCode.aborted,
      message: e.message,
    );
  }
}

Result

Let’s see the result. The first result fails because the server-side generates a value that is bigger than 80. The error info is correctly notified to the client.

$ make runClient                                                    | $ make runServer
--- communicate ---                                                 | Server listening on port 8080...
caught an GrpcError: gRPC Error (                                   | --- communicate was triggered ---
    code: 10, codeName: ABORTED,                                    | received(1): 3
    message: Exception: generated random value was bigger than 80,  | caught an error in communicate: Exception: generated random value was bigger than 80
    details: [], rawResponse: null, trailers: {})

The second result fails due to the client error. The server-side process is also canceled correctly.

$ make runClient                                                     | --- communicate was triggered ---
--- communicate ---                                                  | received(1): 3
received(1): 39                                                      | generated(1): 36
received(2): 66                                                      | received(2): 43
caught an error: Exception: generated random value was bigger than 8 | generated(2): 23
                                                                     | caught an GrpcError in upload: gRPC Error (
                                                                     |     code: 1, codeName: CANCELLED, 
                                                                     |     message: Cancelled, details: null, rawResponse: null, trailers: {})
                                                                     | communication was canceled
                                                                     | communicate completed

The communication is smoothly done for the third trial.

$ make runClient      | --- communicate was triggered ---
--- communicate ---   | received(1): 3
received(1): 22       | generated(1): 19
received(2): 66       | received(2): 29
received(3): 143      | generated(2): 37
received(4): 151      | received(3): 68
received(5): 186      | generated(3): 75
communicate completed | received(4): 148
                      | generated(4): 3
                      | received(5): 157
                      | generated(5): 29
                      | communicate completed

Check another post for data type

Check the following post if you want to know which data type can be used and how to define the advanced data type.

Array and dictionary data types are explained in the following post.

Comments

Copied title and URL