Dart Server Streaming gRPC function example to download a file

eye-catch Dart and Flutter

We will dig into a streaming gRPC function in this post. Please check the following post too if you haven’t read it.

You can clone my GitHub repository if you want to try it yourself.

Sponsored links

How to define streaming RPC in proto file

Let’s define the functions in proto file first. The target function is Download. To define a streaming function, we need to add stream keyword to the return data type.

syntax = "proto3";

import "google/protobuf/timestamp.proto";

option go_package = "api-test/grpc/apitest";

service Middle {
  // Unary RPC
  rpc Ping(PingRequest) returns (PingResponse) {}
  // Unary RPC
  rpc SayHello(HelloRequest) returns (HelloResponse) {}
  // Server Streaming RPC
  rpc Download(DownloadRequest) returns (stream DownloadResponse) {}
  // Client Streaming RPC
  rpc Upload(stream UploadRequest) returns (UploadResponse) {}
  // Bidirectional Streaming RPC
  rpc Communicate(stream CommunicateRequest)
      returns (stream CommunicateResponse) {}
}

message DownloadRequest {
  string filename = 1;
}

message DownloadResponse {
  string line = 1;
}

The way to define a message for stream is the same as the normal definition.

For a server streaming function, DownloadResponse contains line property. A client receives it several times.

Sponsored links

Server implementation

Let’s implement the function on the server side.

final resourcePath = "/workspaces/api-test/languages/dart/resouces/server/";

@override
Stream<rpc.DownloadResponse> download(
    ServiceCall call, rpc.DownloadRequest request) async* {
  try {
    if (request.filename == "error") {
      call.sendTrailers(
        status: StatusCode.invalidArgument,
        message: "error file can't be specified",
      );
      return;
    }

    await Future.delayed(Duration(milliseconds: 500));
    final absPath = p.join(resourcePath, request.filename);
    final file = File(absPath);
    final lines = file
        .openRead() //
        .transform(utf8.decoder)
        .transform(LineSplitter());

    print("lines for loop");
    await for (final line in lines) {
      yield rpc.DownloadResponse()..line = line;
    }
    print("download process completed");
  } on GrpcError catch (e) {
    print("caught an GrpcError in download: $e");
  } catch (e) {
    print("caught an error in download: $e");
  } finally {
    if (call.isCanceled) {
      print("download was cancelled");
    }
  }
}

Be aware of the difference between the unary function and the streaming function for the return data type and async keyword. It’s not Future but Stream. It’s not async but async* with an asterisk.

Stream<rpc.DownloadResponse> download(
    ServiceCall call, rpc.DownloadRequest request) async* {

It makes the function Stream. yield keyword can be used in the function with it. It sends data to the gRPC client multiple times at the line with yield.

The first thing is to open the target file. I didn’t find a good way to know the script file path. So I set an absolute path to resourcePath. Then, split it with a line separator.

final absPath = p.join(resourcePath, request.filename);
final file = File(absPath);
final lines = file
    .openRead() //
    .transform(utf8.decoder)
    .transform(LineSplitter());

The main part of this function is for loop with await keyword. The await keyword affects lines variable in the for loop. When it reads data, the response is sent to a client.

await for (final line in lines) {
  yield rpc.DownloadResponse()..line = line;
}

How to send the error info to the client

Throwing an error doesn’t send the error info to the client in gRPC. We must call call.sendTrailers() if an error needs to be sent to a client.

import "package:grpc/grpc.dart"; // for StatusCode

call.sendTrailers(
  status: StatusCode.invalidArgument,
  message: "error file can't be specified",
);

It is called in an error case in the code above and then return it. This can of course be called in catch block after throwing an error from the try block. Then, we can differentiate the error and set the desired status and the message. No error is notified to the client without this function call. I didn’t call this in catch block so we can check the result later.

How to know the cancellation

It seems that an error is not thrown when the call is canceled.

If we need to do something for the cancellation, it needs to be written in finally block.

try{
  // ... do something
} finally {
  if (call.isCanceled) {
    // cancellation steps here
  }
}

Note that the process reaches to await for loop even though it’s canceled by the client.

But why is it possible to skip the following statements without throwing an error…? I assume that the following statements are added to the onDone function callback in listen() method when using await for loop.

lines.listen((event) {}, 
  onError: {}, 
  onDone: {
    // the statements might be added here
  }, 
  cancelOnError: {
    // probably isCanceled it set to true here
    call.isCanceled = true;
  },
);

Client implementation

The client-side calls the function with the file name which we want to download. It can specify the timeout too. The timeout error occurs if the download doesn’t finish within this specified time.

Future<void> download(String filename, int timeoutMs) async {
  print("--- download ---");
  try {
    final request = rpc.DownloadRequest()..filename = filename;
    final responses = client.download(
      request,
      options: CallOptions(timeout: Duration(milliseconds: timeoutMs)),
    );

    var count = 0;
    var lines = [];
    await for (final res in responses) {
      print("received(${++count}): ${res.line}");
      lines.add(res.line);
    }
    print("download completed");
    print("content: $lines");
  } on GrpcError catch (e) {
    print("caught an GrpcError: $e");
  } catch (e) {
    print("caught an error: $e");
  }
}

The server side sends data multiple times, the client side needs to wait for the response at await for loop. It breaks the loop once the server cancels the function or completes the process.

We can write on GrpcError catch(e) if the grpc-related property needs to be read.

Result

I call it thrice with the following parameters.

await handler.download("test_file.txt", 500);
await handler.download("error", 1000);
await handler.download("unknown_file.txt", 1000);
await handler.download("test_file.txt", 1000);

The server side caught an error for the unknown file.

$ make runServer
Server listening on port 8080...
lines for loop
download was cancelled

lines for loop
caught an error in download: PathNotFoundException: Cannot open file, path = '/workspaces/api-test/languages/dart/resouces/server/unknown_file.txt' (OS Error: No such file or directory, errno = 2)

lines for loop
download process completed

The error state is correctly notified for the cancellation but the error is not notified to the client for the unknown file as I mentioned at the end of the server implementation.

$ make runClient
--- download ---
caught an GrpcError: gRPC Error (code: 4, codeName: DEADLINE_EXCEEDED, message: Deadline exceeded, details: null, rawResponse: null, trailers: {})
--- download ---
caught an GrpcError: gRPC Error (code: 3, codeName: INVALID_ARGUMENT, message: error file can't be specified, details: [], rawResponse: null, trailers: {})
--- download ---
download completed
content: []
--- download ---
received(1): aaaaaa
received(2): bbbbbb
received(3): cccccc
download completed
content: [aaaaaa, bbbbbb, cccccc]

Don’t forget to call call.sendTrailers() in catch block.

Go to the Next Step

Comments

Copied title and URL