Data Analytics
.NET Core Azure Azure Event Hub C# Visual Studio

How to Send Events to Azure Event Hubs with .NET Core

What is an Azure Event Hub?

An Azure Event Hub is a cloud-based broker service that acts to decouple event client requests from backend services in the cloud.

An Event Hub receives client events, stores the events within cloud storage in Azure Blob Storage, then an event processor reads the events from cloud storage and processes the events.

What are typical applicable real world uses for Event Hubs?

  1. Tracking login events from client applications that are accessed in different locations. Occasionally we have users that login from different locations. We would like to check if suspicious logins attempts occur within a short timespan for the same login.
  • Processing application logging output to analyse types of issues and errors that occur within an application in the production environment.
  • Retrieving and analysing event data from a web site such as user page selections, actions and so forth.

The first step is to create an event hub in the Azure portal. We can do this manually in the portal or we can open the Azure CLI and create an event hub as follows: We first initialize variables with generated random values that we can use for our resources:

random_1=$RANDOM
random_2=$RANDOM

Next, we initialize some variables as shown for the location and resource group:

location=australiaeast
resourceGroup=BookLoanResources$random_1

Create a resource group:

az group create --name $resourceGroup \
    --location $location

Generate a variable to hold the event hub namespace:

eventHubNamespace=BookLoanNamespace$random_1
echo "Event Hub Namespace = " $eventHubNamespace

Create an event hub namespace, which must be unique within Azure:

az eventhubs namespace create --resource-group $resourceGroup \
   --name $eventHubNamespace \
   --location $location \
   --sku Basic

Following creation of the above namespace, you will see the JSON output as shown:

{
  "clusterArmId": null,
  "createdAt": "2021-05-07T15:47:06.887000+00:00",
  "encryption": null,
  "id": "/subscriptions/xxx-xxx-xxx-xxx-xxx/resourceGroups/BookLoanResources867/providers/Microsoft.EventHub/namespaces/BookLoanNamespace867",
  "identity": null,
  "isAutoInflateEnabled": false,
  "kafkaEnabled": false,
  "location": "Australia East",
  "maximumThroughputUnits": 0,
  "metricId": "xxx-xxx-xxx-xxx-xxx:bookloannamespace867",
  "name": "BookLoanNamespace867",
  "provisioningState": "Succeeded",
  "resourceGroup": "BookLoanResources867",
  "serviceBusEndpoint": "https://BookLoanNamespace867.servicebus.windows.net:443/",
  "sku": {
    "capacity": 1,
    "name": "Basic",
    "tier": "Basic"
  },
  "tags": {},
  "type": "Microsoft.EventHub/Namespaces",
  "updatedAt": "2021-05-07T15:47:57.100000+00:00",
  "zoneRedundant": false
}

Next, we generate a random variable to hold the event hub name:

eventHubName=BookLoanEventHub$random_1
echo "event hub name = " $eventHubName

Create the event hub from the unique namespace and name with a message retention of 1 day and 2 partitions:

az eventhubs eventhub create --resource-group $resourceGroup \
    --namespace-name $eventHubNamespace \
    --name $eventHubName \
    --message-retention 1 \
    --partition-count 2

Following creation of the above event hub, you will see the JSON output as shown:

{
  "captureDescription": null,
  "createdAt": "2021-05-07T15:53:06.060000+00:00",
  "id": "/subscriptions/xxx-xxx-xxx-xxx-xxx /resourceGroups/BookLoanResources867/providers/Microsoft.EventHub/namespaces/BookLoanNamespace867/eventhubs/BookLoanEventHub867",
  "location": "Australia East",
  "messageRetentionInDays": 1,
  "name": "BookLoanEventHub867",
  "partitionCount": 2,
  "partitionIds": [
    "0",
    "1"
  ],
  "resourceGroup": "BookLoanResources867",
  "status": "Active",
  "type": "Microsoft.EventHub/Namespaces/EventHubs",
  "updatedAt": "2021-05-07T15:53:06.307000+00:00"
}

Obtain the connection string that we use to authenticates the application with the event hub service:

connectionString=$(az eventhubs namespace authorization-rule keys list \
   --resource-group $resourceGroup \
   --namespace-name $eventHubNamespace \
   --name RootManageSharedAccessKey \
   --query primaryConnectionString \
   --output tsv)
echo "Connection string = " $connectionString

The resulting connection string to the event hub endpoint will resemble the following:

(
    Connection string =  Endpoint=sb://[service bus namespace].servicebus.windows.net/;
    SharedAccessKeyName=RootManageSharedAccessKey;
    SharedAccessKey=[your shared access key]
)

After we have run the script, we can see the event hub namespace in the Azure Portal:

And the event hub:

For us to be able to make the most of Event Hubs requires understanding what throughput units and partitions are:

What is a throughput unit?

A throughput unit is the level of scalability of our event hub.

A single throughput unit allows us to ingress (enters the event hub) at up to 1MB / 1000 events per second and egress (exits the event hub) at up to 2MB / 4096 events per second.

What are partitions?

A partition is how we organize events received by an event hub into partitions.

Partitions can be used to increase the number of logs that can be processed by an event hub. Applications that process events from an event hub will require multiple processes to handle the events.

How are events assigned to partitions?

Events are assigned to partitions by using a partition key. When an application (the sender) sends a value to the event hub, it is hashed and then the partition assignment is created.

Generating source events

We now have an event hub which can consume or ingest data from a client. What we will do now is to create some events from a client which can be a service such as a Web API, a Web application, a backend Scheduled Task, a long-running service such as a Windows Service, an Azure Web Job, or an Azure Function.

Below is a diagram showing how we would in a typical scenario push data from client applications via a data store and a worker service into the Event Hub:

The event I will produce will be login events for the online library application. To save time I will generate some sample event data records in a SQL database and have these records read in from a console application.

The structure of our login events is as shown:

UserLogin          string
AttemptDateTime    datetime
Location           string

In our client application we first install the NuGet package:

Azure.Messaging.EventHubs

I add some sample records into the following SQL database table:

CREATE TABLE [dbo].[LoginAudit](
	[ID] [int] IDENTITY(1,1) NOT NULL,
	[UserLogin] [varchar](100) NOT NULL,
	[Country] [varchar](50) NOT NULL,
	[WhenLoggedIn] [datetime] NOT NULL,
 CONSTRAINT [PK_LoginAudit] PRIMARY KEY CLUSTERED 
(
	[ID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

The sample DML script is shown:

DELETE FROM [aspnet-BookCatalog].[dbo].[LoginAudits]

INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'Australia', '2021-05-10 00:15:00');
INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'Australia', '2021-05-10 00:17:00');
INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'Australia', '2021-05-10 00:18:00');
INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'Australia', '2021-05-10 00:20:00');
INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'New Zealand', '2021-05-10 00:21:00');
INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'Australia', '2021-05-10 00:23:00');
INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'Australia', '2021-05-10 00:26:00');
INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'Australia', '2021-05-10 00:27:00');
INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'New Zealand', '2021-05-10 00:29:00');
INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'Australia', '2021-05-10 00:32:00');
INSERT INTO [aspnet-BookCatalog].[dbo].[LoginAudits] VALUES 
('[email protected]', 'Australia', '2021-05-11 00:00:00');

I then construct a .NET Core console app with a background worker process which is polled to read the audit log and submit these in batches within 5-minute intervals of the [WhenLoggedIn] field.

The method that dispatches event collections as a batch is shown below:

public async Task SendEventBatch(List<LoginAuditViewModel> entries)
{
    if (entries.Count == 0)
   	{
        Console.WriteLine($"There are no batch event entries to push.");
   	        return;
    } 

    await using (var producerClient = new  
  	    EventHubProducerClient(_eventHubConnectionString, _eventHubName))
    {
        using EventDataBatch eventBatch = await 
 	   	    producerClient.CreateBatchAsync();

 		foreach (LoginAuditViewModel item in entries)
        {
            this.InsertMessage(item, eventBatch);
        }

        await producerClient.SendAsync(eventBatch);
      	_logger.LogInformation(
            $"SendEventBatch() .. a batch of {entries.Count} events has been published.");
        Console.WriteLine($"A batch of {entries.Count} events has been published.");
   	}
}

The method that pushes each event payload message to the event hub is shown:

public void InsertMessage(LoginAuditViewModel message, EventDataBatch eventBatch)
{
    var payload = new
   	{
        ID = message.ID.ToString(),
        UserLogin = message.UserLogin.ToString(),
        Country = message.Country.ToString(),
       	WhenLoggedIn = message.WhenLoggedIn.ToString()
    };

    var eventMessage = JsonConvert.SerializeObject(payload);

    eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(eventMessage)));

    Console.WriteLine($"Inserted: {message}");
}

Our console shows the following three records retrieved for processing:

An example of one of the serialized events is shown:

After the initial event batch is posted, the event hub shows below with a spike in the request and a value of 3:

The following initial three records have been posted to the event hub:

'[email protected]', 'Australia', '2021-05-10 00:15:00'
'[email protected]', 'Australia', '2021-05-10 00:17:00'
'[email protected]', 'Australia', '2021-05-10 00:18:00'

After the 11 records are posted we will see the state of the event hub timeline for requests, messages, and throughput:

We have seen how to post records into the Azure Event Hub, where they will be retained for a period of time, of which is determined by the initial settings of the event hub namespace and our subscription plan. On the above example I used the basic plan, which gives a 1-day retention, however for higher throughput and retention, you might select the Standard or Dedicated plans.

If we wish to retain and process the event hub data longer than the retention period, we can optionally set the event hub data to be captured within Azure Storage as shown:

The data capture option is available in the Standard tier and above. 

In the next post I will show how to receive the data from the event hub and process it.

That is all for today’s post.

I hope you have found this post useful and informative.

Social media & sharing icons powered by UltimatelySocial