Welcome to today’s post.
In today’s post I will be showing you how to implement the gRPC Client Streaming call type.
I will also explain what a Client Streaming call type is, how it is implemented from a gRPC Service, how it can be used within a gRPC Client, and finally, when to use it in application development scenarios.
In a previous post I explained what gRPC services were, and showed how to implement a basic gRPC service using the gRPC ASP.NET Core project template, then showed how to implement a basic gRPC console client that connected to our gRPC service, then executed a unary (single request, single response) RPC call to retrieve data from the gRPC service.
In another post I defined and showed how to implement the gRPC server streaming call type.
In the first section, I will define a gRPC Client Streaming Call.
Defining a Client Streaming gRPC Call
I will now explain what a Client Streaming gRPC call is.
A gRPC Client Streaming call consists of the following actions:
- (Client) A connection to the gRPC service.
- (Client) A connection to the gRPC method.
- (Client) Send messages to the gRPC server method through RequestStream.WriteAsync().
- (Server) Retrieves each client message using the request input parameter of type IAsyncStreamReader.
- (Server) Execute a while loop that retrieves the next message from the client request stream using MoveNext().
- (Server) Retrieve the current message from the stream using the Current property.
- (Client) Set the stream as complete with a call to RequestStream.CompleteAsync().
- (Client) Retrieve server method result from the server method.
The following diagram shows that after the channel and RPC library connection are established, an initial request starts the stream of messages pushed from the client to the server. After the request is completed, the result from the server is retrieved by the client.
data:image/s3,"s3://crabby-images/ec80c/ec80c54c39ed2559a1179a4f22f34965d698b9ce" alt=""
Some obvious real-world uses of an RPC client stream include:
- posting a real-time stream of local weather data to a weather data service.
- posting scores from a local sports game to a sports service.
- posting purchase data from a local bookstore to a sales reporting service.
- uploading an image to a photo storage service.
The above consist essentially of data (messages) being streamed from a client to a server, which is equivalent to the RPC connection to a gRPC service library. From the client we then push data to the service method request stream. With the server stream call pattern, we were pulling messages from the service. With the client stream call pattern, we are pushing messages to the service.
In the above use-cases, large quantities of data streamed from the server would justify using asynchronous server streaming. Small datasets would only require a finite loop of unary RPC calls.
I will now show how to implement gRPC client streaming calls.
Implementation of a gRPC Service with Client Streaming
Below is our RPC Protobuf IDL file that defines the service and operation contracts, including the data (message) contracts. I will explain the definitions later.
gRPC Server
syntax = "proto3";
option csharp_namespace = "GrpcBookLoanService";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
package library;
// The library service definition.
service Library {
// Returns a matching library - unary call
rpc GetLibrary(LibraryRequest) returns (LibraryReply);
// Returns open libraries given time of day - server streaming call
rpc StreamLibraryStatusFromServer(LibraryRequest) returns (stream LibraryStatusReply);
// Receives a client request messages containing a book return or borrow action - client streaming call
rpc StreamingTransferActionFromClient (stream LibraryBookTransferRequest) returns (LibraryTransferReply);
}
// The request message containing the library name.
message LibraryRequest {
int32 library_id = 1;
}
// The response message containing the library details.
message LibraryReply {
string library_name = 1;
string library_location = 2;
}
// The response message containing the libraries and open status.
message LibraryStatusReply {
string library_name = 1;
string library_location = 2;
bool isOpen = 3;
string status = 4;
}
// the request message containing a book return or borrow action.
message LibraryBookTransferRequest
{
int32 library_id = 1;
string book_id = 2;
string transfer_action = 3;
google.protobuf.Timestamp date_action = 4;
}
// The response message containing the transfer completion status.
message LibraryTransferReply {
int32 number_returned = 1;
int32 number_borrowed = 2;
google.protobuf.Timestamp date_completed = 3;
}
As we did for the unary and server streaming gRPC call types, we add an extra RPC method definition to the service library definition in the Protobuf IDL file.
Our service library contains the definition of the operation contract for the client streaming RPC method:
// Receives a client request messages containing a book return or borrow action - client streaming call
rpc StreamingTransferActionFromClient (stream LibraryBookTransferRequest) returns (LibraryTransferReply);
The above method, unlike a server streaming call, that has an initial request and returns a stream, a client stream differs in that initially following the client connection establishment, messages are pushed asynchronously through the client request stream and a response is returned to the gRPC client after the request stream completes.
As we did for server streamed calls, we can apply configurations (which I will explain later) to the initially established RPC connection to the client stream call that determine how long the request stream can run (its duration) and return a response for each connection and whether the client can on demand put an end to the stream.
The implementation of the library service (extended to include the client stream RPC call) is below:
LibraryService.cs
using Grpc.Core;
using GrpcBookLoanService;
namespace GrpcBookLoanService.Services
{
public class LibraryService : Library.LibraryBase
{
private readonly ILogger<LibraryService> _logger;
public LibraryService(ILogger<LibraryService> logger)
{
_logger = logger;
}
public LibraryReply GetLibraryDetails(int id)
{
string slibrary_name = String.Empty;
string slibrary_location = String.Empty;
switch (id)
{
case 1:
slibrary_name = "NSW State Library";
slibrary_location = "Sydney";
break;
case 2:
slibrary_name = "Eastwood Library";
slibrary_location = "Eastwood";
break;
case 3:
slibrary_name = "Chatswood Library";
slibrary_location = "Chatswood";
break;
default:
slibrary_name = "N/A";
slibrary_location = "N/A";
break;
}
return new LibraryReply
{
LibraryName = slibrary_name,
LibraryLocation = slibrary_location
};
}
public LibraryStatusReply GetLibraryStatusDetails(int id)
{
string slibrary_name = String.Empty;
string slibrary_location = String.Empty;
LibraryReply reply = GetLibraryDetails(id);
bool openStatus = false;
DateTime dt = DateTime.Now;
if ((dt.Hour >= 9) && (dt.Hour <= 17))
openStatus = ((dt.Hour >= 9) && (dt.Hour <= 17)) ? true: false;
Random random = new Random();
int statusValue = random.Next(1, 3);
string status = String.Empty;
switch (statusValue)
{
case 1:
status = "Quiet";
break;
case 2:
status = "Normal";
break;
case 3:
status = "Busy";
break;
default:
status = "Normal";
break;
}
return new LibraryStatusReply
{
LibraryName = reply.LibraryName,
LibraryLocation = reply.LibraryLocation,
IsOpen = openStatus,
Status = (openStatus == true ? status: "Quiet")
};
}
// Example of a unary gRPC call.
public override Task<LibraryReply> GetLibrary(LibraryRequest request, ServerCallContext context)
{
LibraryReply reply = GetLibraryDetails(request.LibraryId);
return Task.FromResult(new LibraryReply
{
LibraryName = reply.LibraryName,
LibraryLocation = reply.LibraryLocation
});
}
// Example of a server streaming gRPC call.
public override async Task StreamLibraryStatusFromServer(LibraryRequest request,
IServerStreamWriter<LibraryStatusReply> responseStream, ServerCallContext context)
{
while (!context.CancellationToken.IsCancellationRequested)
{
LibraryStatusReply libraryStatusReply = GetLibraryStatusDetails(request.LibraryId);
await responseStream.WriteAsync(libraryStatusReply);
await Task.Delay(TimeSpan.FromSeconds(30), context.CancellationToken);
}
}
// Example of a client streaming gRPC call.
public override async Task<LibraryTransferReply> StreamingTransferActionFromClient(
IAsyncStreamReader<LibraryBookTransferRequest> requestStream, ServerCallContext context)
{
int nLoaned = 0, nReturned = 0;
DateTime dtCompleted1 = DateTime.Now;
Google.Protobuf.WellKnownTypes.Timestamp dtCompleted = new Google.Protobuf.WellKnownTypes.Timestamp();
while (!context.CancellationToken.IsCancellationRequested &&
await requestStream.MoveNext())
{
var message = requestStream.Current;
// Add each record to a data reporting service ...
// message.LibraryName
// message.BookId
// message.TransferAction
// message.DateAction
if (message.TransferAction == "Loan")
nLoaned++;
if (message.TransferAction == "Return")
nReturned++;
dtCompleted = message.DateAction;
}
await Task.Delay(TimeSpan.FromSeconds(10), context.CancellationToken);
return new LibraryTransferReply
{
NumberBorrowed = nLoaned,
NumberReturned = nReturned,
DateCompleted = dtCompleted
};
}
}
}
The client streaming RPC method call:
public override async Task<LibraryTransferReply> StreamingTransferActionFromClient(
IAsyncStreamReader<LibraryBookTransferRequest> requestStream,
ServerCallContext context
)
Which has a stream of message requests of type LibraryBookTransferRequest:
message LibraryBookTransferRequest
{
int32 library_id = 1;
string book_id = 2;
string transfer_action = 3;
google.protobuf.Timestamp date_action = 4;
}
And returns a server response message asynchronously of type LibraryTransferReply:
message LibraryTransferReply {
int32 number_returned = 1;
int32 number_borrowed = 2;
google.protobuf.Timestamp date_completed = 3;
}
Note also that one of the fields in the ProtoBuf message contract is a date time type and is declared with a type:
google.protobuf.Timestamp
To be able to use the above type, we import the following Protocol buffers package:
import "google/protobuf/timestamp.proto";
Note: The .NET date time type class, DateTime cannot be used as a date type in a Protobuf Message structure. You need to use the special type google.protobuf.Timestamp.
I have included a condition on the context to allow the client to stop (and cancel) the request stream. To cancel the request stream, we can access the CancellationToken.IsCancellationRequested property of the ServerCallContext parameter and break the while loop in the server call.
Messages containing the book loaned or returned are written to the request stream through the requestStream parameter, which is of type IAsyncStreamReader<LibraryBookTransferRequest>.
Below is the code for the gRPC .NET Core service application build and services collection configuration:
Program.cs
using GrpcBookLoanService.Services;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddGrpc();
var app = builder.Build();
// Configure the HTTP request pipeline.
app.MapGrpcService<LibraryService>();
app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
app.Run();
In the gRPC client I will explain how client messages are pushed to the server stream, how a request stream completes, and how the client stream cancellation occurs.
Implementation of a gRPC Client to Consume the gRPC Client Streaming Call
As we did in the previous post that demonstrated a call to a server stream gRPC call, the gRPC client is a console application that will demonstrate how to establish a channel, a connection, run a request stream, complete the request stream, and retrieve the result from the completed stream call.
Below is the code excerpt that shows how we call a gRPC client stream method:
Program.cs:
using Grpc.Net.Client;
using GrpcBookLoanService;
using System.Threading;
Console.WriteLine("Book Loan gRPC Service Client");
Console.WriteLine("Select the RPC call to make :");
Console.WriteLine("1 = RPC Server Stream.");
Console.WriteLine("2 = RPC Client Stream.");
Console.WriteLine();
int demoSelection = Convert.ToInt32(Console.ReadLine());
if (demoSelection > 2)
{
Console.WriteLine("No selections made. Exiting program.");
return;
}
var channel = GrpcChannel.ForAddress("http://localhost:5262");
var libraryClient = new Library.LibraryClient(channel);
Console.WriteLine("Enter a Library ID [1..3] ..");
int libraryId = Convert.ToInt32(Console.ReadLine());
Console.WriteLine($"Retrieving Library {libraryId} Details.. ");
var response = libraryClient.GetLibrary(new LibraryRequest { LibraryId = libraryId });
Console.WriteLine("Library Name: " + response.LibraryName);
Console.WriteLine("Library Location: " + response.LibraryLocation);
Console.WriteLine($"Retrieving Status Details for Library {libraryId} ..");
int streamRunningDuration = 120;
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
if (demoSelection == 1)
{
// refer to previous post for this code
}
if (demoSelection == 2)
{
var serverCall = libraryClient.StreamingTransferActionFromClient(
null,
DateTime.UtcNow.AddSeconds(streamRunningDuration),
cancellationTokenSource.Token);
try
{
while (!cancellationTokenSource.IsCancellationRequested)
{
for (var count = 0; count<10; count++)
{
DateTime currentServerDateTime = DateTime.UtcNow;
DateTime currentDateTime = currentServerDateTime.ToLocalTime();
Random random = new Random();
int randomBook = random.Next(1, 10);
int randomAction = random.Next(1, 3);
string action = (randomAction == 1 ? "Loan" : "Return");
Console.WriteLine("Pushing following record to RPC server ...");
Console.WriteLine($"Local Date/Time : {currentDateTime}");
Console.WriteLine($"Server Date/Time : {currentServerDateTime}");
Console.WriteLine($"BookId : {randomBook}");
Console.WriteLine($"LibraryId : {libraryId}");
Console.WriteLine($"Action : {action}");
Console.WriteLine("-----------------------------");
Console.WriteLine();
await serverCall.RequestStream.WriteAsync(new LibraryBookTransferRequest
{
BookId = randomBook.ToString(),
LibraryId = libraryId,
TransferAction = action,
DateAction = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(currentServerDateTime)
});
}
// current running borrowing status..
// Complete the async stream.
await serverCall.RequestStream.CompleteAsync();
// Retrieve result from request call handle.
var returnedValue = await serverCall;
Console.WriteLine("Status returned from the RPC service ...");
Console.WriteLine($"Report Date/Time : {returnedValue.DateCompleted}");
Console.WriteLine($"Number Borrowed : {returnedValue.NumberBorrowed}");
Console.WriteLine($"Number Returned : {returnedValue.NumberReturned}");
Console.WriteLine("-----------------------------");
Console.WriteLine();
Console.WriteLine("Continue send more messages to the client stream (Y/N)?");
ConsoleKeyInfo userInput = Console.ReadKey();
char key = userInput.KeyChar;
Console.WriteLine();
Console.WriteLine($"Option selected: {key}");
if ((key == 'N') || (key == 'n'))
{
Console.WriteLine("Cancelling server streaming..");
cancellationTokenSource.Cancel();
}
}
}
catch (Grpc.Core.RpcException gex)
{
if (gex.StatusCode == Grpc.Core.StatusCode.DeadlineExceeded)
Console.WriteLine($"The gRPC service returned exceeded the specified deadline of {streamRunningDuration} seconds.");
else
Console.WriteLine($"The gRPC service returned the following error: {gex.Message.ToString()}");
cancellationTokenSource.CancelAfter(3000);
}
Console.WriteLine("Client streaming completed.");
cancellationTokenSource.Dispose();
}
In the first section of code, I choose the type of call to run (the server stream call we covered in the previous post) as an input from the user.
data:image/s3,"s3://crabby-images/5b5ae/5b5ae566bc6b8eef6b6229357785da75899676ac" alt=""
After choosing the library identifier, the client pushes 10 records with some randomly generated fields to the RPC client stream method:
int streamRunningDuration = 600;
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
var serverCall = libraryClient.StreamLibraryStatusFromServer(
new LibraryRequest
{
LibraryId = libraryId
},
null,
DateTime.UtcNow.AddSeconds(streamRunningDuration),
cancellationTokenSource.Token
);
The above also includes the provision to allow the streaming task to the cancelled by the client with the cancellation token.
The first two records being written (pushed) into the remote request stream are shown below:
data:image/s3,"s3://crabby-images/a5d28/a5d28d1c58ff496b9620127c57f732d5d8bbe8c0" alt=""
Notice also that I have written the date time field as UTC dates. The reason is that gRPC messages only allow UTC dates and not dates in the local time zone. We then have set the date time variable as shown:
DateTime currentServerDateTime = DateTime.UtcNow;
The date time value is then included in the Protobuf message with the following conversion:
Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime({datetime value})
The message that is written asynchronously to the request stream with the converted date time value is as shown:
await serverCall.RequestStream.WriteAsync(new LibraryBookTransferRequest
{
BookId = randomBook.ToString(),
LibraryId = libraryId,
TransferAction = action,
DateAction = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(currentServerDateTime)
});
After the sequence of messages completes being written to the server from the request stream and the call request stream completes, the server response is returned with a summary of the number of books loaned and returned:
data:image/s3,"s3://crabby-images/d96d8/d96d879e9c4cc55cb6c25956a575d6fcbb5bcd7d" alt=""
After the messages have been written to the request stream, the asynchronous request stream is completed with the following call:
await serverCall.RequestStream.CompleteAsync();
We can then retrieve the summary result, which is of type LibraryTransferReply from the completed RPC client stream call:
var returnedValue = await serverCall;
The client stream call is then either re-run or cancelled by the user.
With exceptions and deadlines in the RPC calls, I explained how they are handled in the previous post.
I have demonstrated how to implement a client stream gRPC .NET Core service.
I will explore the use of the two-way (duplex) streaming of a gRPC call in a future post.
That is all for today’s post.
I hope you have found the above post useful and informative.
data:image/s3,"s3://crabby-images/9201e/9201e6ebf25181d9fcf1b057bdc201aa18afdd43" alt=""
Andrew Halil is a blogger, author and software developer with expertise of many areas in the information technology industry including full-stack web and native cloud based development, test driven development and Devops.