Welcome to today’s post.
In today’s post I will discuss how to use the open source MassTransit library to create a distributed messaging application with .NET Core.
Before I can commence with an example of how we can implement distribute messaging in .NET Core, I will cover some definitions that are part of the distributed messaging landscape.
What is Distributed Messaging?
Distributed messaging is based on the queuing of messages between services and applications. All messages are queued asynchronously. The benefits of a distributed messaging system are scalability, reliability, and consistency.
What is a Transport?
A transport is the queuing mechanism by which we use to transport messages from the source to the destination. Typical transport libraries include Azure Service Bus and RabbitMQ. In addition, the built-in in-memory transport can be used to unit test messaging without having to use a production environment queuing service.
What is MassTransit?
MassTransit is a service bus library that provides encapsulation and abstracts distributed messaging transportation with easier configuration and implementation.
What are some of the alternatives to MassTransit?
An alternative to MassTransit is NServiceBus. Each distributed messaging library has different strengths and weaknesses and vary in licensing. MassTransit is however, a free open-source library that has no licensing or subscription costs in all environments (development, testing, or production). It does not have as many features as some of the paid alternatives, however for a developer or team starting out in the evaluation phase of introducing distributed messaging into their architecture it is a library that has all the essential features that you would need to start off with. These include:
- Multiple transports
- Encrypted messaging
- Unit testable
- Support for ORM providers Entity Framework and NHibernate
- Scheduling
- Message orchestration
- Retries
What is a Message Consumer?
A message consumer is a type that consumes messages from a transport receiver endpoint. Messages are despatched from the source application through the transport. When the message is received successfully by the destination queue, the message is sent through the message consumer.
What are the Endpoints?
Endpoints are used to configure the receiver of the messaging service. With a queuing service we would specify the queue name, connection string, retry limits, message format and so on. The source application would not need to know the configuration details once the endpoint is configured.
A messaging endpoint pattern is defined at the enterprise integration patterns site.
A useful diagram showing how a Message Endpoint is configured between a sender and receiver is shown below:
Diagram provided from the following link.
I will now go through the steps to install and implement a Message consumer with Mass Transit.
Installing MassTransit
Installation within Visual Studio 2019 is quite straightforward. We can use either a worker service application or a console application. In this example we will use a console application.
After creating our project from a template, we then install MassTransit from the package manager as shown with the NuGet package MassTransit.AspNetCore:
Creating Sample Data for Messages
Sample data for an application can be created with some JSON data from an input file.
We can create classes for the JSON data by using a feature within Visual Studio that allows us to paste copied JSON data as a C# class. This is done from the menu item Edit | Paste Special | Paste JSON As Classes as shown below:
The resulting classes have some default names as shown below:
After modifying the generated classes with more meaningful names, the class will look something like the class definitions below:
BookingDataModel.cs:
using BookLoanMTransitWorkerServiceApp.Data;
using System;
using System.Collections.Generic;
using System.Text;
namespace BookLoanMTransitWorkerServiceApp.Models
{
public class BookingDataList
{
public List<LoanBookingDataModel> bookingdata { get; set; }
public BookingDataList()
{
bookingdata = new List<LoanBookingDataModel>();
}
}
public class LoanBookingDataModel
{
public string RequestType { get; set; }
public string LoanedBy { get; set; }
public string DateLoaned { get; set; }
public string DateDue { get; set; }
public string DateReturn { get; set; }
public int BookID { get; set; }
}
}
The DTO class above can be used to model Book loan transactions from library members.
Application Setup and Configuration for MassTransit
To setup an application with MassTransit distributed messaging we require the following components:
- Application configuration
- A background worker process
- A message consumer
- Application data source
The application configuration is achieved at the start up within the program’s ConfigureServices() as shown:
Program.cs:
using MassTransit;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using BookLoanMTransitWorkerServiceApp.Interfaces;
using BookLoanMTransitWorkerServiceApp.Data;
namespace BookLoanMTransitWorkerServiceApp
{
class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.AddConsumer<MessageConsumer>();
x.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService(true);
services.AddSingleton<ILoanBookingData, LoanBookingData>();
services.AddHostedService<Worker>();
});
}
}
The configuration to enable MassTransit library is done using the Addxxx() extension method of the service collection:
services.AddMassTransit()
The configuration of the transport is done using the Usingxxx() extension method:
x.UsingInMemory((context, cfg)
Configuration of the message consumer class is done using the AddConsumer() extension method:
x.AddConsumer<T>()
Where T is the message consumer class.
Configuration of both receive and send endpoint is done with the ConfigureEndpoints() extension method:
cfg.ConfigureEndpoints(context)
To add MassTransit as a hosted service we use the following extension method:
services.AddMassTransitHostedService(true)
The MassTransit messaging service is setup in a similar way to how we setup background tasks in hosted services in one of my previous posts. The background work process implementation will be discussed in the next section.
Implementation of the Background Worker Process
The worker process is implemented as follows:
Worker.cs:
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using MassTransit;
using BookLoanMTransitWorkerServiceApp.Interfaces;
using BookLoanMTransitWorkerServiceApp.Models;
namespace BookLoanMTransitWorkerServiceApp
{
public class Worker : BackgroundService
{
readonly IBus _bus;
private readonly ILoanBookingData _loanBookingData;
public Worker(IBus bus, ILoanBookingData loanBookingData)
{
_bus = bus;
_loanBookingData = loanBookingData;
_loanBookingData.LoadBookingData();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var recordCounter = 1;
var numberRecords =
(_loanBookingData.bookingDataList.bookingdata == null ? 0:
_loanBookingData.bookingDataList.bookingdata.Count);
while (!stoppingToken.IsCancellationRequested)
{
LoanBookingDataModel loanBooking =
_loanBookingData.GetNthRecord(recordCounter);
recordCounter++;
await _bus.Publish(new Message {
Data = loanBooking,
Text = $"The time is {DateTime.Now}"
});
await Task.Delay(10000, stoppingToken);
if (recordCounter > numberRecords)
break;
}
}
}
}
To summarize, the above worker process does the following:
- Loads the input booking data from the constructor.
- The worker task loops indefinitely until cancellation.
- Within the worker task loop the following actions are performed:
- Obtain the next record from the input data.
- Publish the record object to the message queue.
- Wait another 10 seconds.
Implementation of the Message Consumer
The message consumer implementation is shown below:
MessageConsumer.cs:
using System.Threading.Tasks;
using BookLoanMTransitWorkerServiceApp.Models;
using MassTransit;
using Microsoft.Extensions.Logging;
namespace BookLoanMTransitWorkerServiceApp
{
public class Message
{
public string Text { get; set; }
public LoanBookingDataModel Data { get; set; }
}
public class MessageConsumer : IConsumer<Message>
{
readonly ILogger<MessageConsumer> _logger;
public MessageConsumer(ILogger<MessageConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<Message> context)
{
string messageDetails =
$"Book {context.Message.Data.BookID} loaned on " +
$"{context.Message.Data.DateLoaned} by member " +
$"{context.Message.Data.LoanedBy}.";
string messageDateTime = context.Message.Text;
_logger.LogInformation("DateTime: {0}, Received Message: {1}",
messageDateTime, messageDetails);
return Task.CompletedTask;
}
}
}
With a message consumer, we have inherited from the IConsumer<Message> interface. This required us to implement the method:
public Task Consume(ConsumeContext<Message> context)
Within the consumer Consume() method we obtain the message from the context, then log the details to the console output.
Implementation of the Application Data Source
The data source interface is declared as shown:
ILoanBookingData.cs:
using BookLoanMTransitWorkerServiceApp.Models;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace BookLoanMTransitWorkerServiceApp.Interfaces
{
public interface ILoanBookingData
{
public BookingDataList bookingDataList { get; set; }
public Task LoadBookingData();
public LoanBookingDataModel GetNthRecord(int n);
}
}
The data source is implemented as shown:
LoanBookingData.cs:
using BookLoanMTransitWorkerServiceApp.Interfaces;
using BookLoanMTransitWorkerServiceApp.Models;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace BookLoanMTransitWorkerServiceApp.Data
{
public class LoanBookingData: ILoanBookingData
{
public BookingDataList bookingDataList { get; set; }
public LoanBookingData()
{
bookingDataList = new BookingDataList()
{
bookingdata = new List<LoanBookingDataModel>()
};
}
public async Task LoadBookingData()
{
if (bookingDataList.bookingdata.Count > 0)
return;
string fileName = "DataFolder/loan_book_sample.json.txt";
using FileStream openStream = File.OpenRead(fileName);
try
{
BookingDataList bookingData =
await JsonSerializer
.DeserializeAsync<BookingDataList>(openStream);
bookingDataList.bookingdata.AddRange(bookingData.bookingdata);
}
catch (Exception ex)
{
Console.WriteLine($"Error reading input file: {ex.Message.ToString()}");
}
}
public LoanBookingDataModel GetNthRecord(int n)
{
var count = 0;
if (bookingDataList?.bookingdata.Count > 0)
{
foreach (LoanBookingDataModel item in bookingDataList.bookingdata)
{
count++;
var loanRecord = (LoanBookingDataModel)item;
if (count >= n)
return loanRecord;
}
}
return null;
}
}
}
The above simulates data by loading from a file containing JSON array data. In more realistic scenarios, we would load updated data from a SQL data source. The method
public LoanBookingDataModel GetNthRecord(int n)
Retrieves the next record from the data source.
Finally, running the above console app produces the following output from the message consumer every 10 seconds:
I hope you have found this post as a useful starting point for implementing distributed messaging applications.
Additional details and reference documents can be found in the MassTransit site.
For Enterprise Messaging Patterns including the Message Endpoint pattern please refer to the Enterprise Messaging Patterns site for more details.
That is all for today’s post.
I hope you found this post useful and informative.
Andrew Halil is a blogger, author and software developer with expertise of many areas in the information technology industry including full-stack web and native cloud based development, test driven development and Devops.