Welcome to today’s post.
I will be discussing how to use microservices to communicate with integration events using Azure Service Bus.
I will also show how to use Azure Service Bus and integration events to continuously update data in other microservices.
Before I give an example of how the communication is implemented, I will explain what integration events are.
What is an Integration Event?
An integration event is an event-based form of communication between microservices.
When an event of significance occurs such as a change to important data within a microservice data store, an event is published by a microservice. Instead of publishing this event directly to another microservice, which would create a dependency between our microservice, we send the event to a service bus queue. Any receiving microservice that subscribes to the event from the queue would then receive the event.
A common use for integration events is to synchronize data between two data sources that reside in two different microservices. An architecture of this scenario is shown below:
The steps taken to update data in the initial data store and to replicate into the destination data store are:
- Client application updates book data.
- Book Catalog API method UpdateBook() is called.
- Book Catalog API updates the source database table.
- Book Catalog API method UpdateBook() publishes change event to Service Bus Queue.
- Azure Function is triggered by the Service Bus.
- Azure Function calls Loan API UpdateBook() method.
- Loan API UpdateBook() method updates the destination database table.
An Example of an Integration Change Event
Below is the definition of an integration change event for new and changed records. These will be used to notify any event receivers of a change in price of a product:
namespace BookLoan.Catalog.API.Events
{
[Serializable]
public class BookInventoryChangedIntegrationEvent: IntegrationEvent
{
public int BookId { get; private set; }
public string NewEdition { get; private set; }
public string OldEdition { get; private set; }
public BookInventoryChangedIntegrationEvent(int bookId, string newEdition,
string oldEdition)
{
BookId = bookId;
NewEdition = newEdition;
OldEdition = oldEdition;
}
}
[Serializable]
public class BookInventoryNewIntegrationEvent: IntegrationEvent
{
public int BookId { get; private set; }
public BookViewModel newBook { get; private set; }
public BookInventoryNewIntegrationEvent(int bookId, BookViewModel newBookModel)
{
BookId = bookId;
newBook = newBookModel;
}
}
}
The base integration event type is defined as shown:
namespace BookLoan.Catalog.API.Events
{
[Serializable]
public class IntegrationEvent
{
public IntegrationEvent()
{
Id = Guid.NewGuid();
CreationDate = DateTime.UtcNow;
}
[JsonConstructor]
public IntegrationEvent(Guid id, DateTime createDate)
{
Id = id;
CreationDate = createDate;
}
[JsonProperty]
public Guid Id { get; private set; }
[JsonProperty]
public DateTime CreationDate { get; private set; }
}
}
Before we progress to look at the implementation, we will review our approach.
Relating Microservice Data Consistency and the CAP Theorem to our approach
The CAP theorem says that you cannot build a distributed database or a microservice that owns its model that is continually available, strongly consistent, and tolerant to any partition. You must choose two of these three properties.
With microservice architectures, availability and tolerance are the recommended properties with eventual consistency. What this means is that we can use a queue-based Publisher-Subscriber pattern with integration events as one method to get our distributed data stores back to a consistent level.
Another approach is the Event Sourcing pattern which requires its own data source to store domain events and apply these transactions to target datastores to achieve consistency.
Publishing Integration Events to the Azure Service Bus from a Microservice API
When an event is to be propagated through to other microservices, the source microservice will publish an integration event (such as the one described above) to a service bus queue.
An example of setting up and publishing an integration event is when we update a book. If there are any changes these are published to the service bus queue as shown below:
public class BookService: IBookService
{
ApplicationDbContext _db;
private readonly ILogger _logger;
private IEventBus _eventBus;
public BookService(ApplicationDbContext db,
ILogger<BookService> logger,
IEventBus eventBus)
{
_db = db;
_logger = logger;
_eventBus = eventBus;
}
..
..
public async Task<BookViewModel> UpdateBook(int Id, BookViewModel vm)
{
BookViewModel book = await _db.Books.Where(a => a.ID == Id).SingleOrDefaultAsync();
if (book != null)
{
string originalEdition = book.Edition;
book.Title = vm.Title;
book.Author = vm.Author;
book.Edition = vm.Edition;
book.Genre = vm.Genre;
book.ISBN = vm.ISBN;
book.Location = vm.Location;
book.YearPublished = vm.YearPublished;
book.DateUpdated = DateTime.Now;
_db.Update(book);
await _db.SaveChangesAsync();
// Detect a changed field.
if (vm.Edition != originalEdition)
{
BookInventoryChangedIntegrationEvent bookInventoryChangedIntegrationEvent
= new BookInventoryChangedIntegrationEvent(Id, book.Edition, originalEdition);
await _eventBus.Publish(bookInventoryChangedIntegrationEvent);
await Task.Delay(2000);
}
}
return book;
}
..
}
A basic event bus implementation is shown below:
namespace BookLoan.Catalog.API.Events
{
public class AzureEventBus : IEventBus
{
private MessageBusQueueHelper messageBusHelper;
public Func<Message, CancellationToken, Task> ReceiveMessageHandler;
public AzureEventBus()
{
messageBusHelper = new MessageBusQueueHelper();
}
public async Task Publish(IntegrationEvent @event)
{
await messageBusHelper.InitSendMessages(@event);
}
..
}
}
In our service bus helper, we can send out service bus messages using JSON follows:
public async Task SendMessagesAsync(object newmessage)
{
try
{
if (_messageBusFormat == MessageBusFormat.Binary)
{
// Create a new message to send to the queue
byte[] messagebytes = SerializationHelpers.SerializeToByteArray(newmessage);
var message = new Message(messagebytes);
await _queueClient.SendAsync(message);
IsMessageSent = true;
}
else
if (_messageBusFormat == MessageBusFormat.JSON)
{
var message = new Message(
Encoding.UTF8.GetBytes(
JsonConvert.SerializeObject(newmessage)));
message.ContentType = "text/plain";
await _queueClient.SendAsync(message);
IsMessageSent = true;
}
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} :: Exception{exception.Message}");
}
}
Where newmessage is our instantiated object.
Using Azure Functions to Synchronize Data across Microservices
The most obvious solution to synchronizing data across microservices is to subscribe to the event bus directly from the receiving API. However, this solution would not achieve the desired level of decoupling as the receiving API would need to be running as a background service and not as a transient API service. When the service is transient, there is no guarantee the data will be synchronized with the desired level of frequency.
An alternative to running the service bus receiver within a windows service or background task is to utilize an Azure serverless function to obtain the integration event from the service bus queue and process the data.
An example of an Azure Service Bus trigger function that will process our integration event from our source API is shown below:
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.InteropExtensions;
using Newtonsoft.Json;
using System.Text;
using BookLoan.Catalog.API.Events;
using BookLoanMicroservices.Helpers;
namespace BookLoanServiceBusQueueTriggerFunctionApp
{
public static class ServiceBusQueueTriggerFunction
{
[FunctionName("ServiceBusQueueTriggerFunction")]
public static void Run([ServiceBusTrigger("xxxxxxxqueue",
Connection = "AzureWebJobsServiceBusQueue")]
Message myQueueItem, ILogger log)
{
// Deserialize the body of the message..
var body = Encoding.UTF8.GetString(myQueueItem.Body);
BookInventoryChangedIntegrationEvent bookInventoryChangedIntegrationEvent = JsonConvert.DeserializeObject<BookInventoryChangedIntegrationEvent>(body);
log.LogInformation($"C# ServiceBus queue trigger function processed message: { bookInventoryChangedIntegrationEvent.BookId}");
log.LogInformation($"C# ServiceBus queue trigger function processed message old edition: {bookInventoryChangedIntegrationEvent.OldEdition}");
log.LogInformation($"C# ServiceBus queue trigger function processed message new edition: {bookInventoryChangedIntegrationEvent.NewEdition}");
}
}
}
Testing a trigger function for Azure Service Bus can be run as an emulated function host console within Visual Studio as shown:
First set a break point within the Azure trigger method in Visual Studio.
Now using POSTMAN or a client application we submit a record to our API for update.
After a successful update we will see the updated data in the request pane:
With our message being successfully posted into the queue, the break point within the azure trigger method will be hit and the message parameter can be inspected as shown:
And the Body property of the message will show the JSON content of our posted record:
The message, if it shows as JSON text, can then be deserialized to the destination object we had submitted from the client application or API:
The trigger function console log will show the method log messages:
[8/04/2020 5:15:49 AM] Executing 'ServiceBusQueueTriggerFunction' (Reason='New ServiceBus message detected on 'xxxxxxxxqueue'.', Id=582e98b4-3d49-4fed-9e97-630f6b76d6d9)
[8/04/2020 5:15:49 AM] Trigger Details: MessageId: 0d0e112ccca649b39dc8e10c9bcfeb38, DeliveryCount: 2, EnqueuedTime: 8/04/2020 5:11:38 AM, LockedUntil: 8/04/2020 5:12:38 AM
[8/04/2020 5:15:50 AM] C# ServiceBus queue trigger function processed message: 2
[8/04/2020 5:15:53 AM] C# ServiceBus queue trigger function processed message old edition: 5
[8/04/2020 5:16:05 AM] C# ServiceBus queue trigger function processed message: 2
[8/04/2020 5:16:06 AM] C# ServiceBus queue trigger function processed message new edition: 6
[8/04/2020 5:16:06 AM] C# ServiceBus queue trigger function processed message old edition: 5
[8/04/2020 5:17:15 AM] C# ServiceBus queue trigger function processed message new edition: 6
[8/04/2020 5:17:15 AM] Executed 'ServiceBusQueueTriggerFunction' (Succeeded, Id=582e98b4-3d49-4fed-9e97-630f6b76d6d9)
The method can be extended to perform additional data updates to synchronize our data or perform other housekeeping or business rules. The most basic example for the service bus messaging send and receive used a basic string as was shown here.
We have seen here an example of how to push more complex types across the Azure service bus queue, which would be applicable within a more realistic enterprise microservice application environment.
That’s all for today’s post.
I hope you found this post useful and informative.
Andrew Halil is a blogger, author and software developer with expertise of many areas in the information technology industry including full-stack web and native cloud based development, test driven development and Devops.