Welcome to today’s post.
I will discuss how to use Azure Service Bus cloud message broker within a Microservice API application or Web application using .NET Core.
In a previous post I showed how to setup an Azure service bus namespace and create a queue within the Azure portal.
Overview of Azure Service Bus
To summarize, Azure Service Bus is a cloud-based messaging service providing queues and topics with publish/subscribe semantics and rich features. This fully managed service is available in multi or single tenant configurations with no servers to manage or licenses to buy.
Use Service Bus to:
- Build reliable and elastic cloud apps with messaging.
- Protect your application from temporary peaks.
- Distribute messages to multiple independent backend systems.
- Decouple your applications from each other.
- Ordered messaging scaled out to multiple readers.
Below is a common architecture that is used to decouple two services with a service bus queue:
I will first give a brief overview on how to install Azure Service Bus into the development environment, then I will show how to integrate Azure service bus queuing within a .NET Core application before going on to show how to send and receive messages from the Service Bus using a .NET Core application.
Installation of Azure Service Bus Package Libraries
In our .NET Core applications, installing the NuGet package Microsoft.Azure.ServiceBus will be a prerequisite.
In the next section I will show how we configure our .NET Core application by establishing a connection to the Service Bus and create an instance of the Queue Client.
Configuring an Application to connect to the Azure Service Bus
Before we can send messages to our Azure Service Bus queue, we will need to undertake some preliminaries.
The following needs to be implemented:
- An instance of a queue client, IQueueClient with the connection string and queue name.
- Send a message using the IQueueClient.SendAsync().
- Close the queue connection using IQueueClient.CloseAsync().
The following is a code snip that implements a helper that initializes a queue client and includes a method to send a message to the queue:
public class MessageBusHelper
{
const string ServiceBusConnectionString = "Endpoint=sb://xxxxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxx";
const string QueueName = "xxxxxxxxxxxxxxxx";
public IQueueClient queueClient;
public Func<Message, CancellationToken, Task> ReceiveMessageHandler;
public async Task InitSendMessages(object message)
{
const int numberOfMessages = 1;
queueClient = new QueueClient(ServiceBusConnectionString,
QueueName);
// Send messages.
await SendMessagesAsync(message, numberOfMessages);
}
public async Task SendMessagesAsync(object newmessage)
{
try
{
// Create a new message to send to the queue
byte[] messagebytes = SerializationHelpers.SerializeToByteArray(newmessage);
var message = new Message(messagebytes);
// Send the message to the queue
await queueClient.SendAsync(message);
await queueClient.CloseAsync();
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
}
}
..
}
Before a message can be sent, it needs to be serialized to a byte array. A useful helper method SerializeToByteArray() is shown below:
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
public static class SerializationHelpers
{
public static byte[] SerializeToByteArray(this object obj)
{
if (obj == null)
{
return null;
}
var bf = new BinaryFormatter();
using (var ms = new MemoryStream())
{
bf.Serialize(ms, obj);
return ms.ToArray();
}
}
..
}
A useful method for deserializing an incoming queue message will be provided once I discuss the receiving of messages.
Sending Messages to an Azure Service Bus Queue
In our calling class we can setup our message and send a message to the service bus as shown:
BookRecordSyncModel bookRecordSyncData = new BookRecordSyncModel()
{
CommandType = "Update",
bookData = vm
};
await messageBusHelper.InitSendMessages(bookRecordSyncData);
In addition, we will need to mark any classes (and any they contain) as serializable before they can be serialized to binary before posting to the service bus queue. This can be done as shown:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.ComponentModel.DataAnnotations;
using BookLoan.Models;
namespace BookLoanMicroservices.Models.MessagingModels
{
[Serializable]
public class BookRecordSyncModel
{
public string CommandType { get; set; }
public BookViewModel bookData;
}
}
The model for each parcel of data is shown below:
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Linq;
using System.Threading.Tasks;
namespace BookLoan.Models
{
[Serializable]
public class BookViewModel
{
public int ID { get; set; }
..
}
}
If you fail to mark the class as serializable the following error will occur:
"Type '[class name]' in Assembly '[assembly namespace], Version=1.0.0.0, Culture=neutral, PublicKeyToken=null' is not marked as serializable."
Before a message is dispatched, the Active messages can be seen in the service bus overview in the Azure portal:
After a message is sent to the service bus, the number of active messages within our queue in Azure will increase as follows:
In addition, the message requests (posts) and when they were posted can be seen in the timing graph (left side) as shown:
In the next section I will show how to receive messages from an Azure Service Bus queue.
Receiving messages from an Azure Service Bus Queue
To receive messages from an Azure service bus queue the following needs to be implemented:
- An instance of a queue client, IQueueClient with the connection string and queue name.
- Register an asynchronous message handler using IQueueClient.RegisterMessageHandler().
- Close the queue connection using IQueueClient.CloseAsync()
The following code snip shows how this is done:
public class MessageBusHelper
{
const string ServiceBusConnectionString = "Endpoint=sb://xxxxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxx";
const string QueueName = "xxxxxxxxxxxxxxxx";
public IQueueClient queueClient;
public Func<Message, CancellationToken, Task>
ReceiveMessageHandler;
public async Task InitReceiveMessages()
{
queueClient = new QueueClient(ServiceBusConnectionString,
QueueName);
RegisterOnMessageHandlerAndReceiveMessages();
}
public void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
// Register the function that will process messages
queueClient.RegisterMessageHandler(ReceiveMessageHandler, messageHandlerOptions);
}
public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
Note: we have not included a close queue client after IQueueClient.RegisterMessageHandler() within our helper method RegisterOnMessageHandlerAndReceiveMessages(). The reason is that after the service bus queue message is received, if IQueueClient.CloseAsync() closes before the message handler has completed, we will get the following exception:
Message handler encountered an exception System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'MessageReceiver with Id 'MessageReceiver4bookloanasbqueue' has already been closed. Please create a new MessageReceiver.'.
To rectify this synchronization problem we can include either a manual rest event within the helper method and wait on the event with a timeout in our calling class or include a small delay after calling the helper method InitReceiveMessages().
Our helper class includes a delegate function:
public Func<Message, CancellationToken, Task> ReceiveMessageHandler;
which can be assigned to a handler either within the helper class or within the calling class.
In our calling class we can define an asynchronous method to process queue messages that are received. An example is shown below:
public async Task ProcessMessagesAsync(Message message,
CancellationToken token)
{
// Process the message
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
BookRecordSyncModel bookRecordSyncModel =
SerializationHelpers.Deserialize<BookRecordSyncModel>(message.Body);
if (bookRecordSyncModel.CommandType == "Update")
{
// run some task or process that inserts the book data
//into another data store.
..
}
await messageBusHelper.queueClient.CompleteAsync(
message.SystemProperties.LockToken);
messageBusHelper.IsMessageReceived = true;
}
To deserialize messages, the helper method is shown below:
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
public static class SerializationHelpers
{
..
public static T Deserialize<T>(this byte[] byteArray)
where T : class
{
if (byteArray == null)
{
return null;
}
using (var memStream = new MemoryStream())
{
var binForm = new BinaryFormatter();
memStream.Write(byteArray, 0, byteArray.Length);
memStream.Seek(0, SeekOrigin.Begin);
var obj = (T)binForm.Deserialize(memStream);
return obj;
}
}
}
Below is an example of how to process the service bus queue messages with a small delay before closing the queue client connection:
messageBusHelper.ReceiveMessageHandler += ProcessMessagesAsync;
await messageBusHelper.InitReceiveMessages();
await Task.Delay(2000);
if (!messageBusHelper.queueClient.IsClosedOrClosing)
await messageBusHelper.queueClient.CloseAsync();
After receiving messages from the queue, the portal queues will display the outgoing message (received message) as shown:
The Queue timing graph will display the outgoing message similar to what we had discussed for the message posting earlier.
After a message is received and purged from the service bus, the number of active messages within our queue in Azure will decrease by one unit in the Active Message Count .as shown in the queue overview:
That’s all for today’s post.
I hope you found this post useful and informative and a taste of how Azure service bus queues can be used to queue and process messages from your applications.
Andrew Halil is a blogger, author and software developer with expertise of many areas in the information technology industry including full-stack web and native cloud based development, test driven development and Devops.