Microservices
.NET .NET Core C# gRPC RPC Visual Studio

How to Implement the gRPC Bidirectional Streaming .NET Core Call Type

Welcome to today’s post.

In today’s post I will be explaining the RPC bidirectional server streaming call type and how to use it with the gRPC technology in a .NET Core client and server scenario. In this post I will interchangeably refer to duplex, two-way, and bidirectional streaming as the same concept.

I will explain what the bidirectional server streaming RPC call type is, how it is declared and implemented from a gRPC service, how it can be used within a gRPC client, and finally, when to use it in application development scenarios.

In a previous post I explained what gRPC services were, then showed how to implement a basic gRPC service using gRPC ASP.NET Core project templates, then showed how to implement a basic gRPC console client that connected to our gRPC service, then executed a unary (single request, single response) RPC call to retrieve data from the gRPC service.

In other posts I defined and showed how to implement the gRPC server streaming call type. In another post I defined and showed how to implement the gRPC client streaming call type.  

Some obvious real-world uses of an RPC bidirectional (duplex) stream include:

  • Client posts a stream of chat messages to a server and receives a video stream from a server.
  • Client posts a stream of data updates to a server and receives a stream of updated dashboard data from a server.
  • Client posts new and updated form field data value to a server and receives form validation results from the server.
  • Client posts files for upload to a server and receives upload progress from the server.

I will first explain what a bidirectional streaming RPC call is.

Explaining the Bidirectional Streaming RPC Call

In this section I will elaborate and expand on the core definitions of a bidirectional streaming RPC call.

A bidirectional streaming RPC call is an RPC stream server method call that consists of a server streaming RPC call and a client streaming RPC call. The call involves one connection to the RPC method call and one completion. All the client and server calls are asynchronously dispatched and received.

A bidirectional RPC call involves the following calls on the client and server:

  1. Client reads messages from an RPC server through a response stream.
  2. Client writes messages to an RPC server through a request stream.
  3. Server reads messages from an RPC client through a request stream.
  4. Server writes messages to an RPC client through a response stream.

There is more than one way to apply bidirectional message streaming.

  1. We can post a sequence of messages from the client, process the received messages, then for each received message, push a response message (that is dependent on each of the messages) back to the client stream.
  2. We can also post a batch of messages, do some processing on the batch of messages, then run another response message or batch of responses back to the client stream.

Provided we maintain an ordering of the request and response streams within our asynchronous tasks, the concurrency of the messages and consistency of the incoming and outgoing message data is ensured.

I will now explain how the messages flow within a typical gRPC bidirectional streaming call.

A Typical gRPC Bidirectional Streaming Call

A bidirectional streaming gRPC client call of the first type of duplexing consists of the following actions on the client and server sides:

  1. (Client) A connection to the gRPC service.
  2. (Client) A connection to the gRPC method.
  3. (Client) Receive messages in a while loop in a background task from the gRPC server method with ResponseStream.ReadAllAsync().
  4. (Client) Send messages in a while loop to the gRPC server method with RequestStream.WriteAsync().
  5. (Server) Retrieves each client message using the request stream input parameter of type IAsyncStreamReader.
  6. (Server) Sends messages to the client using the response stream input parameter of type IServerStreamWriter.
  7. (Server) Execute a while loop in a background task that retrieves the next message from the client request stream using MoveNext(). Retrieve the current message from the request stream using the Current property.
  8. (Server) While the request stream is still reading messages, run a loop that sends messages back to the response stream with ResponseStream.WriteAsync().
  9. (Client) In a while loop send messages to the gRPC server method with RequestStream.WriteAsync().
  10. (Client) Set the request stream as complete with a call to RequestStream.CompleteAsync().
  11. (Client) Await completion of the client request background task.

The above looks like quite a complex sequence of events. I will illustrate the above with a diagram and later will provide some code to give a clearer picture of how the asynchronous message requests and responses are dispatched back and forth between the gRPC client and server.

The following diagram shows that after the channel and RPC library connection are established, an initial request starts the stream of messages pushed from the client to the server. After the request is completed, the result from the server is retrieved by the client.

The above consist essentially of data (messages) being streamed from a client to a server, and from the server back to the client in the same RPC connection to the gRPC service library. This is a combination of the server and client gRPC call patterns running simultaneously.

I will now show how to implement gRPC bidirectional streaming calls.

Implementation of a gRPC Service with Duplex Streaming

Below is our RPC Protobuf IDL file that defines the service and operation contracts, including the data (message) contracts. I will explain the definitions later.

gRPC Server

syntax = "proto3";

option csharp_namespace = "GrpcBookLoanService";

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

package library;

// The library service definition.
service Library {
  // Returns a matching library - unary call
  rpc GetLibrary(LibraryRequest) returns (LibraryReply);

  // Returns open libraries given time of day - server streaming call
  rpc StreamLibraryStatusFromServer(LibraryRequest) returns (stream LibraryStatusReply);

  // Receives a client request messages containing a book return or borrow action - client streaming call
  rpc StreamingTransferActionFromClient (stream LibraryBookTransferRequest) returns (LibraryTransferReply);


  // Receives and sends messages containing borrowing/return actions simultaneously - bidirectional streaming
  rpc StreamingTransferActionDuplex (stream LibraryBookTransferRequest) returns (stream LibraryBookTransferResponse);
}

// The request message containing the library name.
message LibraryRequest {
  int32 library_id = 1;
}

// The response message containing the library details.
message LibraryReply {
  string library_name = 1;
  string library_location = 2;
}

// The response message containing the libraries and open status.
message LibraryStatusReply {
  string library_name = 1;
  string library_location = 2;
  bool isOpen = 3;
  string status = 4;
}

// the request message containing a book return or borrow action.
message LibraryBookTransferRequest
{
  int32 library_id = 1;
  string book_id = 2;
  string transfer_action = 3; 
  google.protobuf.Timestamp date_action = 4; 
}

// the response message containing a book return or borrow action.
message LibraryBookTransferResponse
{
  int32 library_id = 1;
  string book_id = 2;
  string transfer_action = 3; 
  google.protobuf.Timestamp date_action = 4; 
  string message_response = 5;
}

// The response message containing the transfer completion status.
message LibraryTransferReply {
  int32 number_returned = 1;
  int32 number_borrowed = 2;
  google.protobuf.Timestamp date_completed = 3; 
}

As we did for the unary, server and client streaming gRPC call types, we add an extra RPC method definition to provide duplex streaming calls to the service library definition in the Protobuf IDL file.

Our service library contains the definition of the operation contract for the client streaming RPC method:

// Receives and sends messages containing borrowing/return actions simultaneously - 
// bidirectional streaming
rpc StreamingTransferActionDuplex (stream LibraryBookTransferRequest) returns (stream LibraryBookTransferResponse);

The above method, unlike a server streaming call, that has an initial request and returns a stream in one direction, a bidirectional stream differs in that initially following the client connection establishment, request messages are pushed asynchronously through the client request stream and message responses are returned from the server to the gRPC client before the request stream completes.

As we did for server streamed calls, we can apply configurations (which I will explain later) to the initially established RPC connection to the duplex stream call that determine how long the bidirectional stream can run (its duration) and return a response for each connection and whether the client can on demand put an end to the stream.

The implementation of the library service (extended to include the duplex stream RPC call) is below:

LibraryService.cs

using Grpc.Core;
using GrpcBookLoanService;
using System.Threading.Tasks;

namespace GrpcBookLoanService.Services
{
    public class LibraryService : Library.LibraryBase
    {
        private readonly ILogger<LibraryService> _logger;
        public LibraryService(ILogger<LibraryService> logger)
        {
            _logger = logger;
        }

        public LibraryReply GetLibraryDetails(int id)
        {
            string slibrary_name = String.Empty;
            string slibrary_location = String.Empty;

            switch (id)
            {
                case 1:
                    slibrary_name = "NSW State Library";
                    slibrary_location = "Sydney";
                    break;
                case 2:
                    slibrary_name = "Eastwood Library";
                    slibrary_location = "Eastwood";
                    break;
                case 3:
                    slibrary_name = "Chatswood Library";
                    slibrary_location = "Chatswood";
                    break;
                default:
                    slibrary_name = "N/A";
                    slibrary_location = "N/A";
                    break;
            }

            return new LibraryReply
            {
                LibraryName = slibrary_name,
                LibraryLocation = slibrary_location
            };
        }

        public LibraryStatusReply GetLibraryStatusDetails(int id)
        {
            string slibrary_name = String.Empty;
            string slibrary_location = String.Empty;

            LibraryReply reply = GetLibraryDetails(id);

            bool openStatus = false;
            DateTime dt = DateTime.Now;
            if ((dt.Hour >= 9) && (dt.Hour <= 23))
                openStatus = ((dt.Hour >= 9) && (dt.Hour <= 23)) ? true: false;

            Random random = new Random();
            int statusValue = random.Next(1, 4);
            string status = String.Empty;
            switch (statusValue)
            {
                case 1:
                    status = "Quiet";
                    break;
                case 2:
                    status = "Normal";
                    break;
                case 3:
                    status = "Busy";
                    break;
                default:
                    status = "Normal";
                    break;
            }

            return new LibraryStatusReply
            {
                LibraryName = reply.LibraryName,
                LibraryLocation = reply.LibraryLocation,
                IsOpen = openStatus,
                Status = (openStatus == true ? status: "Quiet")
            };
        }

        // Example of a unary gRPC call.
        public override Task<LibraryReply> GetLibrary(LibraryRequest request, ServerCallContext context)
        {
            LibraryReply reply = GetLibraryDetails(request.LibraryId);

            return Task.FromResult(new LibraryReply
            {
                LibraryName = reply.LibraryName,
                LibraryLocation = reply.LibraryLocation
            });
        }

        // Example of a server streaming gRPC call.
        public override async Task StreamLibraryStatusFromServer(LibraryRequest request,
            IServerStreamWriter<LibraryStatusReply> responseStream, ServerCallContext context)
        {
            while (!context.CancellationToken.IsCancellationRequested)
            {
                LibraryStatusReply libraryStatusReply = GetLibraryStatusDetails(request.LibraryId);
                await responseStream.WriteAsync(libraryStatusReply);
                await Task.Delay(TimeSpan.FromSeconds(30), context.CancellationToken);
            }
        }

        // Example of a client streaming gRPC call.
        public override async Task<LibraryTransferReply> StreamingTransferActionFromClient(
    IAsyncStreamReader<LibraryBookTransferRequest> requestStream, ServerCallContext context)
        {
            int nLoaned = 0, nReturned = 0;
            Google.Protobuf.WellKnownTypes.Timestamp dtCompleted = new Google.Protobuf.WellKnownTypes.Timestamp();

            while (!context.CancellationToken.IsCancellationRequested && 
                    await requestStream.MoveNext())
            {
                var message = requestStream.Current;
                // Add each record to a data reporting service ...
                // message.LibraryName
                // message.BookId
                // message.TransferAction
                // message.DateAction
                if (message.TransferAction == "Loan")
                    nLoaned++;
                if (message.TransferAction == "Return")
                    nReturned++;
                dtCompleted = message.DateAction;
            }
            await Task.Delay(TimeSpan.FromSeconds(10), context.CancellationToken);
            return new LibraryTransferReply
            {
                NumberBorrowed = nLoaned,
                NumberReturned = nReturned,
                DateCompleted = dtCompleted
            };
        }


        // Example of a duplex streaming gRPC call.
        public override async Task StreamingTransferActionDuplex(
    IAsyncStreamReader<LibraryBookTransferRequest> requestStream,
    IServerStreamWriter<LibraryBookTransferResponse> responseStream,
    ServerCallContext context)
        {
            int nLoaned = 0, nReturned = 0;

            // Read requests in a background task.
            var readTask = Task.Run(async () =>
            {
                await foreach (var message in requestStream.ReadAllAsync())
                {
                    if (context.CancellationToken.IsCancellationRequested)
                        break;

                    if (message.TransferAction == "Loan")
                        nLoaned++;
                    if (message.TransferAction == "Return")
                        nReturned++;

                    // Update some server data..
                    Console.WriteLine($"Server request stream received {nLoaned} loan messages.");
                    Console.WriteLine($"Server request stream received {nReturned} return messages.");
                    Console.WriteLine();

                    var writeTask = Task.Factory.StartNew(
                        async () => {

                        Console.WriteLine("Start writing message to response stream...");

                        DateTime currentServerDateTime = DateTime.UtcNow;
                        DateTime currentDateTime = currentServerDateTime.ToLocalTime();

                        await responseStream.WriteAsync(new LibraryBookTransferResponse
                        {
                            BookId = message.BookId.ToString(), 
                            LibraryId = message.LibraryId, 
                            TransferAction = message.TransferAction, 
                            DateAction =
                                Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(currentServerDateTime),
                            MessageResponse =
                                $"Library {message.LibraryId} {message.TransferAction} desk has processed book {message.BookId}."
                        });

                        Console.WriteLine("Finished writing message to response stream.");
                    }, TaskCreationOptions.AttachedToParent);

                    try
                    {
                        writeTask.Wait(context.CancellationToken);
                    }
                    catch (AggregateException ae)
                    {
                        foreach (var ex in ae.InnerExceptions)
                        {
                            Console.WriteLine(
$"Exception from response writer task: {ex.Message}");
                            throw ex;
                        }
                    } 
                }
            });
            
            Console.WriteLine("All request stream messages processed.");

            await Task.Delay(TimeSpan.FromSeconds(10), context.CancellationToken);
        }
    }
}

As we did for the unary, server and client streaming gRPC call types, we added an extra RPC method definition to the service library definition in the Protobuf IDL file.

Our service library contains the definition of the operation contract for the bidirectional duplex streaming RPC method call:

public override async Task StreamingTransferActionDuplex(
    IAsyncStreamReader<LibraryBookTransferRequest> requestStream,
    IServerStreamWriter<LibraryBookTransferResponse> responseStream,
    ServerCallContext context)

Which has a stream of message requests of type LibraryBookTransferRequest:

message LibraryBookTransferRequest
{
  int32 library_id = 1;
  string book_id = 2;
  string transfer_action = 3; 
  google.protobuf.Timestamp date_action = 4; 
}

And returns a server response message steam asynchronously of type and a stream of message responses of type LibraryBookTransferResponse. 

// the response message containing a book return or borrow action.
message LibraryBookTransferResponse
{
  int32 library_id = 1;
  string book_id = 2;
  string transfer_action = 3; 
  google.protobuf.Timestamp date_action = 4; 
 }

The above method, unlike server and client streaming calls, a bidirectional duplex stream differs in that initially following the client connection establishment, messages are pushed (sent/written) asynchronously and pulled (received/read) asynchronously through the request and response streams. The request message stream is read within the RPC service method through a background parent task, then a synchronized child background task within the parent task pushes records back to the client though the request stream. This is achieved in the code block (see earlier code for full details):

var readTask = Task.Run(async () =>
{
      await foreach (var message in requestStream.ReadAllAsync())
      {
            …
            var writeTask = Task.Factory.StartNew(
                async () => {
                    await responseStream.WriteAsync(new LibraryBookTransferResponse
                    {
                        …					
                    });
                    …
                }, TaskCreationOptions.AttachedToParent
            );

            try
            {
                  writeTask.Wait(context.CancellationToken);
            }
            catch (AggregateException ae)
            {
                  …
            }
      }
}

The above is used in cases where we want the content of the response messages to have some dependency with the request messages.

Meanwhile, on the RPC client, messages are read from the response stream within a background thread and records are pushed to the RPC service through the request stream. The bidirectional stream of messages ends after the RPC client has stopped sending messages to the server and the background thread in the RPC client has completed processing messages from the server. Without a finite stream of messages, the bidirectional stream can only end with a cancellation request from the RPC client or if the deadline duration is reached.

As we did for server and client streamed calls, we can apply configurations (which I explained in the previous posts) to the initially established RPC connection to the client stream call that determine how long the request stream can run (its duration) and return a response for each connection and whether the client can on demand put an end to the stream (with a cancellation token request).

Much of the implementation of the gRPC client console application I explained in the previous posts.

I will show the code excerpt for the calls to the bidirectional duplex service RPC call below:

Implementation of a gRPC Client that Consumes the gRPC Duplex Streaming Call

The following code from the gRPC console client application demonstrates how to establish a duplex RPC stream, retrieve (pull) and send (push) messages to and from the RPC bidirectional RPC stream, and complete the duplex stream.

…
var serverCall = libraryClient.StreamingTransferActionDuplex(
        null,
        DateTime.UtcNow.AddSeconds(streamRunningDuration),
        cancellationTokenSource.Token);

try
{
      while (!cancellationTokenSource.IsCancellationRequested)
      {
            Console.WriteLine("Receive message stream..");

            var readStreamTask = Task.Run(async () =>
            {
                while (await serverCall.ResponseStream.MoveNext(cancellationTokenSource.Token))
                {
                    var message = serverCall.ResponseStream.Current;

                    // Echo messages received from the service
                    Console.WriteLine("Received following record from RPC server ...");
                    Console.WriteLine($"Local Date/Time  : {message.DateAction.ToDateTime()}");
                    Console.WriteLine($"BookId           : {message.BookId}");
                    Console.WriteLine($"LibraryId        : {message.LibraryId}");
                    Console.WriteLine($"Action           : {message.TransferAction}");
                    Console.WriteLine($"Response Message : {message.MessageResponse}");
                    Console.WriteLine("-----------------------------");
                    Console.WriteLine();
                }
            });

            Console.WriteLine("Send message stream..");

            bool keepSendingMessages = true;

            while (keepSendingMessages)
            {
                for (int i = 0; i < 5; i++)
                {
                    DateTime currentServerDateTime = DateTime.UtcNow;
                    DateTime currentDateTime = currentServerDateTime.ToLocalTime();
                    Random random = new Random();
                    int randomBook = random.Next(1, 10);
                    int randomAction = random.Next(1, 3);
                    string action = (randomAction == 1 ? "Loan" : "Return");

                    Console.WriteLine("Pushing following record to RPC server ...");
                    Console.WriteLine($"Local Date/Time  : {currentDateTime}");
                    Console.WriteLine($"Server Date/Time : {currentServerDateTime}");
                    Console.WriteLine($"BookId           : {randomBook}");
                    Console.WriteLine($"LibraryId        : {libraryId}");
                    Console.WriteLine($"Action           : {action}");
                    Console.WriteLine("-----------------------------");
                    Console.WriteLine();

                    await serverCall.RequestStream.WriteAsync(new LibraryBookTransferRequest
                    {
                        BookId = randomBook.ToString(),
                        LibraryId = libraryId,
                        TransferAction = action,
                        DateAction = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(currentServerDateTime)
                    });
                }

                Console.WriteLine("Continue send more messages to the client stream (Y/N)?");
                ConsoleKeyInfo userInput = Console.ReadKey();
                char key = userInput.KeyChar;
                Console.WriteLine();
                Console.WriteLine($"Option selected: {key}");
                if ((key == 'N') || (key == 'n'))
                {
                    keepSendingMessages = false;

                    Console.WriteLine("Disconnecting request stream");

                    await serverCall.RequestStream.CompleteAsync();
                    await readStreamTask;

                    Console.WriteLine("Cancelling server streaming..");
                    cancellationTokenSource.Cancel();
                }
            }
      }
}
catch (Grpc.Core.RpcException gex)
{
        if (gex.StatusCode == Grpc.Core.StatusCode.DeadlineExceeded)
            Console.WriteLine($"The gRPC service returned exceeded the specified deadline of {streamRunningDuration} seconds.");
        else
            Console.WriteLine($"The gRPC service returned the following error: {gex.Message.ToString()}");
        cancellationTokenSource.CancelAfter(3000);
}
Console.WriteLine("Duplex streaming completed.");
cancellationTokenSource.Dispose();

The above client pushes 10 message records with some randomly generated fields to the RPC duplex request stream and reads message records asynchronously from the RPC response duplex stream in a background task.

The above code also includes the capability to allow the streaming task to the cancelled by the client with the cancellation token.

Before we (the RPC client) could receive any messages from the RPC service, records from the RPC service would have been pushed to the RPC client:

The next stream of messages are the responses that are from the RPC server:

After the last message has been pushed to the RPC server and the final message has been read in from the RPC server, the background processing thread ends. In the outer loop we manually trigger the cancellation token request.

For each record that is pushed to the RPC service, the number of each type of record (of type Loan or of type Return output on the RPC server side).

After the messages have been written to the request stream, the asynchronous request stream is completed with the following call:

await serverCall.RequestStream.CompleteAsync();

We can then complete the duplex server call with the following command:

await readStreamTask;

The duplex stream call is then either re-run or cancelled by the user.

With exceptions and deadlines in the RPC calls, I explained how they were handled in the previous post.

With a gRPC duplex streaming call, we would expect response stream messages to follow request stream messages. One restriction we have with the duplex streaming RPC call is that the request and response cannot be concurrently reading and writing from the RPC reader and writer channels. The request and response streams must perform concurrent reading and writing in separate threads. If both the reading and writing are in a sequence, they can both be in the same thread.  

The other consideration we make is to determine the message processing pattern we use in our RPC duplex method. Do we use a reader stream in a background thread then wait for it to process all request stream messages before running a separate loop (or a thread) that writes the messages through the writer stream, or do we use a reader stream in a background thread spawn off a thread for each message written through the response stream?

In this post I have explained what a bidirectional RPC streaming call is and have demonstrated how to implement a bidirectional duplex stream gRPC .NET Core service.

That is all for today’s post.

I hope you have found the above post useful and informative.

Social media & sharing icons powered by UltimatelySocial