Welcome to today’s post.
In today’s post I will be showing how to implement the gRPC server streaming call type.
I will also explain what a server streaming call type is, how it is implemented from a gRPC service, how it can be used within a gRPC client, and finally, when to use it in application development scenarios.
In a previous post I explained what gRPC services were, showed how to implement a basic gRPC service using the gRPC ASP.NET Core project template, then showed how to implement a basic gRPC console client that connected to our gRPC service, then executed a unary (single request, single response) RPC call to retrieve data from the gRPC service.
I will now explain what a server streaming gRPC call is.
A server streaming gRPC client call consists of the following actions:
- A connection to the gRPC service.
- Send a single request to the gRPC connection and retrieve the server response, ResponseStream.
- Execute a while loop that retrieves the next message from the response stream using MoveNext().
- Retrieve the current message from the stream using the Current property.
The following diagram shows that after the channel and RPC library connection are established, an initial request starts the stream of messages pulled from the server to the client.
data:image/s3,"s3://crabby-images/affe4/affe4a167333ee1b11d4075439ab2cf2f8d03f7d" alt=""
Some obvious real-world uses of an RPC server stream include:
- streaming of weather changes from a specified location
- streaming scores from a sports game
- streaming stock quotes from a listed company
The above consist essentially of data being streamed from a subscription which is equivalent to the RPC connection to a gRPC service library. From the subscription we then pull data from the service method response stream.
In the above use-cases, large quantities of data streamed from the server would justify using asynchronous server streaming. Small datasets would only require a finite loop of unary RPC calls. Before I go into the implementation of the gRPC client calls, I will show how to implement a gRPC server streaming call.
Implementation of a Server Streaming call in a gRPC Service
Like we did for a unary gRPC method call, before we can implement the associated classes, we defined a Protobuf definition file (with extension .proto).
Below is our Protobuf definition file:
library.proto
syntax = "proto3";
option csharp_namespace = "GrpcBookLoanService";
package library;
// The library service definition.
service Library {
// Returns a matching library - unary call
rpc GetLibrary(LibraryRequest) returns (LibraryReply);
// Returns open libraries given time of day - server streaming call
rpc StreamLibraryStatusFromServer(LibraryRequest) returns (stream LibraryStatusReply);
}
// The request message containing the library name.
message LibraryRequest {
int32 library_id = 1;
}
// The response message containing the library details.
message LibraryReply {
string library_name = 1;
string library_location = 2;
}
// The response message containing the libraries and open status.
message LibraryStatusReply {
string library_name = 1;
string library_location = 2;
bool isOpen = 3;
string status = 4;
}
As I explained in the previous post, the Protobuf is a language agnostic script that defines all the service contracts, operation contracts, and messages from our gRPC service. Once the project is built, all the type definitions from the Protobuf file are precompiled into a base library and are then available to the service library that is written in the target language of the project (in this case, C#).
Our service library contains the definition of the operation contract for the server streaming RPC method:
// Returns open libraries given time of day - server streaming call
rpc StreamLibraryStatusFromServer(LibraryRequest)
returns (stream LibraryStatusReply);
The above method, unlike a unary call, that takes a request and response, a server stream differs in that the response is not a single response, but potentially infinite responses that are streamed and returned as a response message back to the gRPC client. Once the client establishes a connection and makes the initial request call, the subsequent responses are pulled from the gRPC service asynchronously.
As I mentioned, the streamed responses can go on indefinitely, however there are some configurations (which I will explain later) to the initially established RPC connection that can determine how long the stream can run (its duration) and deliver responses for each connection and whether the client can on demand put an end to the stream.
In the example I will construct, to save time, the data returned from the RPC methods will be contrived and in-memory. If you wish to enhance it to use data from SQL data stores (or other types of databases) you can do so.
The implementation of the library service is below:
LibraryService.cs
using Grpc.Core;
using GrpcBookLoanService;
namespace GrpcBookLoanService.Services
{
public class LibraryService : Library.LibraryBase
{
private readonly ILogger<LibraryService> _logger;
public LibraryService(ILogger<LibraryService> logger)
{
_logger = logger;
}
public LibraryReply GetLibraryDetails(int id)
{
string slibrary_name = String.Empty;
string slibrary_location = String.Empty;
switch (id)
{
case 1:
slibrary_name = "NSW State Library";
slibrary_location = "Sydney";
break;
case 2:
slibrary_name = "Eastwood Library";
slibrary_location = "Eastwood";
break;
case 3:
slibrary_name = "Chatswood Library";
slibrary_location = "Chatswood";
break;
default:
slibrary_name = "N/A";
slibrary_location = "N/A";
break;
}
return new LibraryReply
{
LibraryName = slibrary_name,
LibraryLocation = slibrary_location
};
}
public LibraryStatusReply GetLibraryStatusDetails(int id)
{
string slibrary_name = String.Empty;
string slibrary_location = String.Empty;
LibraryReply reply = GetLibraryDetails(id);
bool openStatus = false;
DateTime dt = DateTime.Now;
if ((dt.Hour >= 9) && (dt.Hour <= 17))
openStatus = ((dt.Hour >= 9) && (dt.Hour <= 17)) ? true: false;
Random random = new Random();
int statusValue = random.Next(1, 3);
string status = String.Empty;
switch (statusValue)
{
case 1:
status = "Quiet";
break;
case 2:
status = "Normal";
break;
case 3:
status = "Busy";
break;
default:
status = "Normal";
break;
}
return new LibraryStatusReply
{
LibraryName = reply.LibraryName,
LibraryLocation = reply.LibraryLocation,
IsOpen = openStatus,
Status = (openStatus == true ? status: "Quiet")
};
}
// Example of a unary gRPC call.
public override Task<LibraryReply> GetLibrary(LibraryRequest request, ServerCallContext context)
{
LibraryReply reply = GetLibraryDetails(request.LibraryId);
return Task.FromResult(new LibraryReply
{
LibraryName = reply.LibraryName,
LibraryLocation = reply.LibraryLocation
});
}
// Example of a server streaming gRPC call.
public override async Task StreamLibraryStatusFromServer(
LibraryRequest request,
IServerStreamWriter<LibraryStatusReply> responseStream,
ServerCallContext context)
{
while (!context.CancellationToken.IsCancellationRequested)
{
LibraryStatusReply libraryStatusReply = GetLibraryStatusDetails(request.LibraryId);
await responseStream.WriteAsync(libraryStatusReply);
await Task.Delay(TimeSpan.FromSeconds(30), context.CancellationToken);
}
}
}
}
The use of a simple helper function, GetLibraryDetails() returns library properties for a given library id.
The server streaming RPC method call,
public override async Task StreamLibraryStatusFromServer(
LibraryRequest request,
IServerStreamWriter<LibraryStatusReply> responseStream,
ServerCallContext context
)
which takes a message request of type LibraryRequest:
message LibraryRequest {
int32 library_id = 1;
}
and returns a stream of messages of type MessageStatusReply:
message LibraryStatusReply {
string library_name = 1;
string library_location = 2;
bool isOpen = 3;
string status = 4;
}
I mentioned earlier that the client can let the streamed service call run indefinitely or stop (and cancel) the running stream. To cancel the streamed messages, we can access the CancellationToken.IsCancellationRequested property of the ServerCallContext parameter and break the while loop.
Messages containing the library status are written back to the response stream through the responseStream parameter, which is of type IServerStreamWriter<LibraryStatusReply>
In the gRPC client I will explain how the messages are read from the server stream and how the cancellation occurs.
Below is the code for the gRPC .NET Core service application build and services collection configuration:
Program.cs
using GrpcBookLoanService.Services;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddGrpc();
var app = builder.Build();
// Configure the HTTP request pipeline.
app.MapGrpcService<LibraryService>();
app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
app.Run();
In the next section I will show how a gRPC Client consumes a gRPC message stream.
Implementation of a gRPC Client to consume the gRPC Server Streaming call
As we did in the previous post that demonstrated a call to a unary gRPC call, the gRPC client is a console application that will demonstrate how to establish a channel, a connection, then make a request, and consume the message stream from the server.
Below is the code excerpt that shows how we call a gRPC server stream method:
Program.cs
using Grpc.Net.Client;
using GrpcBookLoanService;
Console.WriteLine("Book Loan gRPC Service Client");
var channel = GrpcChannel.ForAddress("http://localhost:5262");
var libraryClient = new Library.LibraryClient(channel);
Console.WriteLine("Enter a Library ID [1..3] ..");
int libraryId = Convert.ToInt32(Console.ReadLine());
Console.WriteLine($"Retrieving Library {libraryId} Details.. ");
var response = libraryClient.GetLibrary(new LibraryRequest { LibraryId = libraryId });
Console.WriteLine("Library Name: " + response.LibraryName);
Console.WriteLine("Library Location: " + response.LibraryLocation);
Console.WriteLine($"Retrieving Status Details for Library {libraryId} ..");
int streamRunningDuration = 600;
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
var serverCall = libraryClient.StreamLibraryStatusFromServer(
new LibraryRequest
{
LibraryId = libraryId
},
null,
DateTime.UtcNow.AddSeconds(streamRunningDuration),
cancellationTokenSource.Token);
while (!cancellationTokenSource.IsCancellationRequested &&
await serverCall.ResponseStream.MoveNext(cancellationTokenSource.Token))
{
Console.WriteLine($"Received message for library {libraryId} ...");
var message = serverCall.ResponseStream.Current;
Console.WriteLine($"Date/Time : {DateTime.Now}");
Console.WriteLine($"Message (Is Open): {message.IsOpen}");
Console.WriteLine($"Message (Status) : {message.Status}");
Console.WriteLine("-----------------------------------");
Console.WriteLine("Continue stream (Y/N)?");
ConsoleKeyInfo userInput = Console.ReadKey();
char key = userInput.KeyChar;
Console.WriteLine();
Console.WriteLine($"Option selected: {key}");
if ((key == 'N') || (key == 'n'))
{
Console.WriteLine("Cancelling server streaming..");
cancellationTokenSource.Cancel();
}
}
Console.WriteLine("Server streaming completed.");
cancellationTokenSource.Dispose();
The first section of code, requests an input from the user:
data:image/s3,"s3://crabby-images/38601/38601baedbb6dea1e594147a3ba4c17b5a78c083" alt=""
Then executes a unary gRPC request call to retrieve library details as we did in the previous post. It includes establishing a channel and connection to the gRPC library service.
Earlier, I mentioned that we can control the stream duration by setting a configuration parameter in the gRPC library connection. We set the duration to 10 minutes (600 seconds) as shown:
int streamRunningDuration = 600;
var serverCall = libraryClient.StreamLibraryStatusFromServer(
new LibraryRequest
{
LibraryId = libraryId
},
null,
DateTime.UtcNow.AddSeconds(streamRunningDuration),
null
);
I also explained how the asynchronous server streaming can be cancelled by the client. This is done by creating an instance of a CancellationTokenSource and passing the Token property to the library connection:
int streamRunningDuration = 600;
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
var serverCall = libraryClient.StreamLibraryStatusFromServer(
new LibraryRequest
{
LibraryId = libraryId
},
null,
DateTime.UtcNow.AddSeconds(streamRunningDuration),
cancellationTokenSource.Token
);
In the conditional part of the while loop we ensure the cancellation token is not requested, and only retrieve the next message from the stream when not cancelled.
while (!cancellationTokenSource.IsCancellationRequested &&
await serverCall.ResponseStream.MoveNext(cancellationTokenSource.Token))
{
…
}
As the stream message retrieval continues, each message and its property values will be logged to the debug console as shown:
data:image/s3,"s3://crabby-images/5d97b/5d97b9c68fe7e9e075a3269007fb21e715fe81ed" alt=""
The current message (of type LibraryStatusReply) is retrieved from the response from the line:
var message = serverCall.ResponseStream.Current;
When the user confirms the need to cancel the stream, the cancellation token that we passed into the response stream’s MoveNext() method is placed into a cancellation request state with the call:
cancellationTokenSource.Cancel();
I have also reclaimed the memory resource for the cancellation token source by running the following after the while loop exists:
cancellationTokenSource.Dispose();
Below we see the final messages following client cancellation of the server stream:
data:image/s3,"s3://crabby-images/c169f/c169fcc5c440233c548fa1c8ff6849b7ebb87349" alt=""
Once the cancellation occurs, you will notice the following errors showing on the debug console of the gRPC service:
data:image/s3,"s3://crabby-images/9e953/9e953a2100cf3986038760f5a1b743d449da0751" alt=""
This just tells us that the asynchronously running server streaming task of the gRPC method has been cancelled by the user. The service is still running and can still receive calls to the server streaming method.
I will now explain how to use the deadline configuration.
In the above example, I set the deadline duration to 600 seconds. That wasn’t short enough for me to reach the deadline for the few streamed messages I received from the gRPC server. The total duration we reached before I cancelled the task was around 90 seconds (equivalent to 3 responses).
I first set the duration to 2 minutes (2 x 60 seconds):
int streamRunningDuration = 120;
The other thing to know is that when the deadline is reached, the gRPC service will throw an exception of type Grpc.Core.RpcException.
The unhandled message of the error looks like this:
data:image/s3,"s3://crabby-images/daf44/daf447551b1d8e235a7f0095ba1152c749fc6121" alt=""
We can capture this error gracefully by wrapping the while loop within an exception block and handler as follows:
try
{
while (!cancellationTokenSource.IsCancellationRequested &&
await serverCall.ResponseStream.MoveNext(cancellationTokenSource.Token))
{
…
}
}
catch (Grpc.Core.RpcException gex)
{
if (gex.StatusCode == Grpc.Core.StatusCode.DeadlineExceeded)
Console.WriteLine($"The gRPC service returned exceeded the specified deadline of {streamRunningDuration} seconds.");
else
Console.WriteLine($"The gRPC service returned the following error: {gex.Message.ToString()}");
cancellationTokenSource.CancelAfter(3000);
}
Console.WriteLine("Server streaming completed.");
cancellationTokenSource.Dispose();
The status code of the server stream deadline exceeded exception is:
StatusCode.DeadlineExceeded
When we run the above gRPC client and process the first three stream response iterations, on the fourth iteration we get the error:
data:image/s3,"s3://crabby-images/3df04/3df04e895800ac64be18bbfae4650eeb8cb3589d" alt=""
In the exception handler I have given a user-friendly message and cancelled the cancellation token as it is no longer needed.
I have demonstrated how to implement a server stream gRPC .NET Core service and how to configure it for cancellations and deadlines from a gRPC client.
I will explore the use of the client streaming gRPC call in a future post.
That is all for today’s post.
I hope you have found the above post useful and informative.
data:image/s3,"s3://crabby-images/9201e/9201e6ebf25181d9fcf1b057bdc201aa18afdd43" alt=""
Andrew Halil is a blogger, author and software developer with expertise of many areas in the information technology industry including full-stack web and native cloud based development, test driven development and Devops.