We will learn how to implement the client streaming gRPC function in this post. If you have something unknown in this post, please go to the following pages too.
- Dart How to setup devcontainer for gRPC
- Dart The first gRPC server and client with timestamp
- Dart Server Streaming gRPC function example to download a file
- (This post) Dart Client Streaming gRPC function example to Upload a file
- Dart gRPC Bidirectional streaming example
You can clone my GitHub repository if you want to try it yourself.
How to define streaming RPC in proto file
Let’s check the function definition in proto file. The target function is Upload. stream keyword is added only to the function parameter.
syntax = "proto3";
import "google/protobuf/timestamp.proto";
option go_package = "api-test/grpc/apitest";
service Middle {
  // Unary RPC
  rpc Ping(PingRequest) returns (PingResponse) {}
  // Unary RPC
  rpc SayHello(HelloRequest) returns (HelloResponse) {}
  // Server Streaming RPC
  rpc Download(DownloadRequest) returns (stream DownloadResponse) {}
  // Client Streaming RPC
  rpc Upload(stream UploadRequest) returns (UploadResponse) {}
  // Bidirectional Streaming RPC
  rpc Communicate(stream CommunicateRequest)
      returns (stream CommunicateResponse) {}
}
message UploadRequest {
  string filename = 1;
  bytes chunk = 2;
}
message UploadResponse {
  bool result = 1;
  int64 writtenSize = 2;
  string message = 3;
}UploadRequest contains filename and chunk. We will implement it later but as you can imagine, filename is used only for the first message to let the server knows where to create the file. We can make it optional but the filename is always sent to the server in this example.
Server implementation
Let’s implement it on the server side. I will show you the complete code first.
@override
Future<rpc.UploadResponse> upload(
    ServiceCall call, Stream<rpc.UploadRequest> request) async {
  var writtenSize = 0;
  print("--- upload was triggered ---");
  try {
    var count = 0;
    var content = [];
    await for (final req in request) {
      if (req.filename == "error") {
        throw ArgumentError("filename must not be 'error'");
      }
      await Future.delayed(Duration(milliseconds: 100));
      writtenSize += req.chunk.length;
      final line = utf8.decode(req.chunk);
      print("received(${++count}):${req.filename}, $line");
      content.add(line);
    }
    print("upload completed");
    print("content: ${content.join("\n")}");
    final response = rpc.UploadResponse();
    response.message = "COMPLETED";
    response.writtenSize = $fixnum.Int64(writtenSize);
    response.result = true;
    return response;
  } on GrpcError catch (e) {
    print("caught an GrpcError in upload: $e");
  } on ArgumentError catch (e) {
    print("caught an ArgumentError. Set trailers");
    call.sendTrailers(
      status: StatusCode.invalidArgument,
      message: e.message,
    );
  } catch (e) {
    print("caught an error in upload: $e");
  } finally {
    if (call.isCanceled) {
      print("upload canceled");
    }
  }
  final response = rpc.UploadResponse();
  response.message = "FAILED";
  response.writtenSize = $fixnum.Int64(writtenSize);
  response.result = false;
  return response;
}The return value is not Stream but Future because the server consumes the stream in it and then returns the result only once. The request parameter is instead Stream.
Future<rpc.UploadResponse> upload(
    ServiceCall call, Stream<rpc.UploadRequest> request) asyncThis is the main part of the implementation to receive the data from the client side. A stream can be consumed with await for loop. We can do whatever we want with these values. We can return the response in case of an error but it throws an error instead in this example.
var count = 0;
var content = [];
await for (final req in request) {
  if (req.filename == "error") {
    throw ArgumentError("filename must not be 'error'");
  }
  await Future.delayed(Duration(milliseconds: 100));
  writtenSize += req.chunk.length;
  final line = utf8.decode(req.chunk);
  print("received(${++count}):${req.filename}, $line");
  content.add(line);
}We need to create the response. Since writtenSize is defined as int64 in protocol buffer, we need to convert int value to int64 y using fixnum package.
final response = rpc.UploadResponse();
response.message = "COMPLETED";
response.writtenSize = $fixnum.Int64(writtenSize);
response.result = true;
return response;The last part is error handling. It’s a good thing to write log when the call is canceled. The handling should always be written in finally block. If an error needs to be notified to a client-side, call call.sendTrailers() with the desired status code and the message.
} on GrpcError catch (e) {
  print("caught an GrpcError in upload: $e");
} on ArgumentError catch (e) {
  print("caught an ArgumentError. Set trailers");
  call.sendTrailers(
    status: StatusCode.invalidArgument,
    message: e.message,
  );
} catch (e) {
  print("caught an error in upload: $e");
} finally {
  if (call.isCanceled) {
    print("upload canceled");
  }
}
final response = rpc.UploadResponse();
response.message = "FAILED";
response.writtenSize = $fixnum.Int64(writtenSize);
response.result = false;
return response;Be aware that it must return the response in all paths. We could ignore the return statement for a server streaming function but not for a client streaming function because it returns Future but not Stream. However, IDE tells us the following error, so we don’t have to worry about this mistake.
The body might complete normally, causing 'null' to be returned, but the return type, 'FutureOr<UploadResponse>', is a potentially non-nullable type.
Try adding either a return or a throw statement at the end.Client implementation
The main flow of the client implementation is simple. Call the upload function. That’s it.
Future<void> upload(String filename, int timeoutMs) async {
  Stream<rpc.UploadRequest> readFile() async* {
    final absPath = p.join(resourcePath, "data.txt");
    final file = File(absPath);
    final lines = file
        .openRead() //
        .transform(utf8.decoder)
        .transform(LineSplitter())
        .transform(utf8.encoder);
    await for (final lineBytes in lines) {
      final request = rpc.UploadRequest();
      request.filename = filename;
      request.chunk = lineBytes;
      Future.delayed(Duration(milliseconds: 300));
      yield request;
    }
  }
  print("--- upload ---");
  try {
    final response = await client.upload(
      readFile(),
      options: CallOptions(timeout: Duration(milliseconds: timeoutMs)),
    );
    print("upload completed\n");
    print("response: {$response}");
  } on GrpcError catch (e) {
    print("caught an GrpcError: $e");
  } catch (e) {
    print("caught an error: $e");
  }
}Since it must send streaming data, we need a function that returns Stream. It’s better to define it in a separate function. It’s readFile(). Basically, the simplest function format is the following.
Stream<rpc.XxxxxRequest> doSomething() async* {
  await for(final value in values){
    yield value;
  }
}Return type is Stream.
Add async keyword with an asterisk.
Send a value repeatedly with yield keyword.
The internal logic for readFile file is simple. Open a file, decode the content to UTF8, split by line break, encode it to bytes, and send it to the server.
Result
Let’s run a server and call upload function with the following parameters.
await handler.upload("error", 500);
await handler.upload("data.txt", 100);
await handler.upload("data.txt", 10000);Server side
$ make runServer
Server listening on port 8080...
--- upload was triggered ---
caught an ArgumentError. Set trailers
--- upload was triggered ---
--- upload was triggered ---
received(1):data.txt, first line
caught an GrpcError in upload: gRPC Error (code: 1, codeName: CANCELLED, message: Cancelled, details: null, rawResponse: null, trailers: {})
upload canceled
received(1):data.txt, first line
received(2):data.txt, second line
received(3):data.txt, third line
received(4):data.txt, fourth line
received(5):data.txt, and so on...
received(6):data.txt, end line
upload completed
content: first line
second line
third line
fourth line
and so on...
end lineClient side
$ make runClient
--- upload ---
caught an GrpcError: gRPC Error (code: 3, codeName: INVALID_ARGUMENT, message: filename must not be 'error', details: [], rawResponse: null, trailers: {})
--- upload ---
caught an GrpcError: gRPC Error (code: 4, codeName: DEADLINE_EXCEEDED, message: Deadline exceeded, details: null, rawResponse: null, trailers: {})
--- upload ---
upload completed
response: {result: true
writtenSize: 62
message: COMPLETED
}The error code and the message are set as expected when ArgumentError is thrown for the first call.
For the second call, the call is canceled by the caller due to the timeout while the server side sleeps for 100 ms. In the meantime, the client calls the function with different parameters. Therefore, the logging order is not as expected.
--- upload was triggered ---   <-- second call
--- upload was triggered ---   <-- third call
received(1):data.txt, first line
caught an GrpcError in upload: gRPC Error (code: 1, codeName: CANCELLED, message: Cancelled, details: null, rawResponse: null, trailers: {})
upload canceled   <-- canceled for the second call
received(1):data.txt, first line   <-- logging for third callBy the way, I had a connection problem.
I added the following code to the server implementation.
await Future.delayed(Duration(milliseconds: 100));Before adding this code, the connection for the third call with the following parameters was disconnected for some reason.
await handler.upload("error", 500);
await handler.upload("data.txt", 1);
await handler.upload("data.txt", 10000);It didn’t always occur but often for the first attempt after the server starts up.
Hmm… It’s still unclear why it happened.

 
  
  
  
  


Comments