Asynchronous programming
.NET .NET Core Asynchronous C# Patterns Performance SQL Threading Visual Studio

How to Throttle Parallel Batch Data Processing Tasks with .NET Core

Welcome to today’s post.

In today’s post I will be discussing what is throttling in the context of multi-threaded tasks and how to use it within a .NET Core Web API application.

What is throttling? Throttling is a technique that is used to slow down the frequency of tasks or throughput that are running within a process.

When do we use Throttling?

There are situations where the excessive throughput can cause a destination hardware or software resource, such as the File IO, Database Reads/Writes, or even the CPU to be limited in how many processes can be queued before experiencing a bottleneck. In the case of File IO, there will be an I/O bottleneck with limits on concurrent writes. With a Database, there will be potential read and write deadlocking.

With the CPU, there will be thread starvation, with multiple thread work items competing for the same CPU processing from the thread pool. If the number of queued work items exceeds the number threads in the thread pool, the work items will be delayed until more threads are created in the thread pool. The number of threads created in the thread pool is determine by a few resources: memory, CPU saturation, or number of work items.

Remediation to Scale Multiple Threads

To remediate these situations so that the concurrent tasks are scaled so we can apply one of two common solutions:

  1. Create a message queue between the source tasks and the destination service.
  2. Throttle or delay the throughput of the source tasks before they are dispatched to the destination service.

In a previous post, I showed how to use message queuing to improve scalability of parallel or batch tasks within a service or application.

An example of using throttling might be when we are concurrently uploading multiple files using a Web API upload method and wish to limit the throughput of the database or file upload tasks are being processed.

In the API controller method below, we accept multiple files from a client application:

[HttpPost("api/[controller]/UploadFiles")]
public async Task<IActionResult> UploadFiles([FromForm] List<IFormFile> files)
{
    var uploadResponse = await _fileUtilityService.UploadFiles(files);
    if (uploadResponse.ErrorMessage != "")
        return BadRequest(new { error = uploadResponse.ErrorMessage });
    return Ok(uploadResponse);
}

Our file input payload can be tested using tools such as POSTMAN or Swagger Open API as shown:

The service class method that handles the concurrent file upload is shown below:

using System;
using System.IO;
…
using System.Collections.Generic;
using System.Threading.Tasks; // semaphore slim
using System.Threading;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent; // concurrent bag
…

public async Task<FileUploadResponse> UploadFiles(List<IFormFile> files)
{
    bool IsDatabaseSaveMocked = false;

    ConcurrentBag<FileUploadResponseData> uploadedFiles = new 
        ConcurrentBag<FileUploadResponseData>();

    this._numberFilesUploaded = 0;
    this._errorMessage = "";

    try
    {
        if (files.Count > (int)ApplicationConstants.MAXIMUM_UNTHROTTLED_FILES)
        {
            var throttler = new SemaphoreSlim(initialCount: 3);
            List<Task> uploadTasks = new List<Task>();
                    
            foreach (var f in files)
            {
                uploadTasks.Add(Task.Run(async () =>
                {                         
                    _logger.LogInformation(
                        "Uploading file: " + f.FileName);

                    await throttler.WaitAsync();
                            
                    _logger.LogInformation("Number files waiting: " + 
                        throttler.CurrentCount.ToString());

                    try
                    {
                        await this.UploadSingleFile(f, 
                            uploadedFiles, IsDatabaseSaveMocked);
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
            }

            await Task.WhenAll(uploadTasks);
            logger.LogInformation(
 	
            string.Format(StringConstants.NUMBER_FILES_UPLOADED,
                files.Count));

            return new FileUploadResponse() { 
                Data = uploadedFiles.ToList(), 
                ErrorMessage = "" 
            };
        }
        else
        {
            foreach (var f in files)
            {
                await this.UploadSingleFile(f, 
                    uploadedFiles, 
                    IsDatabaseSaveMocked);
            }

            _logger.LogInformation(
                string.Format(
                    StringConstants.NUMBER_FILES_UPLOADED, 
                    files.Count
                )
            );

            return new FileUploadResponse() { 
                Data = uploadedFiles.ToList(), 
                ErrorMessage = "" 
            };
        }
    }
    catch (Exception ex)
    {
        _logger.LogInformation(
            StringConstants.ERROR_UPLOADING_MULTIPLE_FILES
        );
        
        return new FileUploadResponse() { 
            ErrorMessage = ex.Message.ToString() 
        };
    }
}

Within the above method, we use the concurrent class ConcurrentBag to accumulate a list of processed upload files:

ConcurrentBag<FileUploadResponseData> uploadedFiles = new 
    ConcurrentBag<FileUploadResponseData>();

In the next section, will explain how throttling works in the above code excerpt.

The Key to Throttling: SemaphoreSlim

We make use of a semaphore synchronization object to limit the maximum number of concurrently processed threads:

var throttler = new SemaphoreSlim(initialCount: 3);

In the case of file uploads, we are limiting the number of simultaneous file uploads to at most 3, ensuring the file upload list is thread safe during each file upload.

Each task is stored within a list where we can keep track of the completion state: 

List<Task> uploadTasks = new List<Task>();

The throttling pattern is essentially acquiring and releasing the count of the number of concurrently running threads, ensuring the synchronization object is not released to allow additional tasks to run when the concurrently running tasks exceeds the limit of 3:

await throttler.WaitAsync(); // increases the concurrent task count

try
{
    // perform a task that we will throttle.
}
finally
{	
    throttler.Release(); // decrement the concurrent task count
}

The Semaphore increments the CurrentCount property up to the maximum count on an await:

And decrements the CurrentCount count when released:

The use of asynchronous Task runs allows us to run each file within the collection concurrently and complete when all tasks have completed:

foreach (var f in files)
{
    uploadTasks.Add(Task.Run(async () =>
    { 
        // perform the concurrent task.
    }));
}

await Task.WhenAll(uploadTasks);

Uploading a Single File Task

Each file upload can be considered as a single work item task that will run within a thread allocated from the thread pool.

Our file upload functionality that is called within each task run is shown below:

private async Task UploadSingleFile(IFormFile f, 
    ConcurrentBag<FileUploadResponseData> uploadedFiles,
    Boolean mockSave)
{
    string name = f.FileName.Replace(@"\\\\", @"\\");

    if (f.Length > 0)
    {
       	var memoryStream = new MemoryStream();

       	// Check file name is valid
        if (f.FileName.Length == 0)
        {
            uploadedFiles.Add(new FileUploadResponseData()
            {
                Id = 0,
                Status = StringConstants.ERROR_MESSAGE,
                FileName = Path.GetFileName(name),
                ErrorMessage = string.Format(
                    StringConstants.INVALID_FILE_NOT_UPLOADED, name)
            });
            this._errorMessage = StringConstants.ERROR_MESSAGE;
            return;
        }

        try
        {
            await f.CopyToAsync(memoryStream);

            // Upload check if less than 2mb!
            if (memoryStream.Length < 
                (int)ApplicationConstants.MAXIMUM_FILE_UPLOAD_SIZE)
            {
                var file = new FileUploadInfo()
                {
                    FileName = Path.GetFileName(name),
                    FileSize = memoryStream.Length,
                    UploadDate = DateTime.Now,
                    UploadedBy = "Admin",
                    FileContent = memoryStream.ToArray()
                };

                if (!mockSave)
                {
                    lock (this)
                    {
                        _db.UploadedFiles.Add(file);
                        _db.SaveChanges();
                        _logger.LogInformation(
                            "Uploaded file: " + name);
                    }
                }
                else
                {
                    var rand = new Random();
                    int mSec = rand.Next(9, 999);
                    Thread.Yield();
                    Thread.Sleep(mSec * 10);
                    _logger.LogInformation("Uploaded file: " + 
                        name + ". Time taken: " + mSec.ToString());
                }

                uploadedFiles.Add(new FileUploadResponseData()
                {
                    Id = file.Id,
                    Status = StringConstants.OK_MESSAGE,
                    FileName = Path.GetFileName(name),
                    ErrorMessage = "",
                });

                this._numberFilesUploaded++;
                if (this._errorMessage != StringConstants.ERROR_MESSAGE)
                    this._errorMessage = StringConstants.OK_MESSAGE;
            }
            else
            {
                uploadedFiles.Add(new FileUploadResponseData()
                {
                    Id = 0,
                    Status = StringConstants.ERROR_MESSAGE,
                    FileName = Path.GetFileName(name),
                    ErrorMessage = 
                        string.Format(
                            StringConstants.LARGE_FILE_NOT_UPLOADED, 
                            name)
                });
                this._errorMessage = StringConstants.ERROR_MESSAGE;
            }
        }
        finally
        {
            memoryStream.Close();
            memoryStream.Dispose();
        }
    }
}

Within the above database access code, we ensure that SQL access is gated so that we do not get access violation errors that are caused when attempting a multiple thread access (read or write) on a database context:

lock (this)
{
    _db.UploadedFiles.Add(file);
    _db.SaveChanges();
    _logger.LogInformation("Uploaded file: " + name);
}

In the next section, I will show how to run the above multiple file upload task in a test simulation.

Running Test File Upload Simulations

To test the above threading and throttling pattern, we pass a mock flag to simulate a task with a random time duration:

var rand = new Random();
int mSec = rand.Next(9, 999);
Thread.Yield();
Thread.Sleep(mSec * 10);
_logger.LogInformation("Uploaded file: " + name + ". Time taken: " + mSec.ToString());

When running the above in the test/mock mode, we get the following output, which shows throttling limits the number of concurrent tasks:

File.Uploader.API.Services.FileUtilityService: Information: Uploading file: test-file-3.txt
File.Uploader.API.Services.FileUtilityService: Information: Uploading file: test-file-1.txt
File.Uploader.API.Services.FileUtilityService: Information: Uploading file: test-file-2.txt
File.Uploader.API.Services.FileUtilityService: Information: Number files waiting: 0
File.Uploader.API.Services.FileUtilityService: Information: Number files waiting: 1
File.Uploader.API.Services.FileUtilityService: Information: Number files waiting: 2
File.Uploader.API.Services.FileUtilityService: Information: Uploading file: test-file-11.xls
File.Uploader.API.Services.FileUtilityService: Information: Uploading file: test-file-9.txt
File.Uploader.API.Services.FileUtilityService: Information: Uploading file: test-file-8.txt
File.Uploader.API.Services.FileUtilityService: Information: Uploading file: test-file-7.txt
File.Uploader.API.Services.FileUtilityService: Information: Uploading file: test-file-6.txt
File.Uploader.API.Services.FileUtilityService: Information: Uploading file: test-file-5.txt
File.Uploader.API.Services.FileUtilityService: Information: Uploading file: test-file-4.txt
File.Uploader.API.Services.FileUtilityService: Information: Uploaded file: test-file-1.txt. Time taken: 50
File.Uploader.API.Services.FileUtilityService: Information: Number files waiting: 0
File.Uploader.API.Services.FileUtilityService: Information: Uploaded file: test-file-3.txt. Time taken: 758
File.Uploader.API.Services.FileUtilityService: Information: Number files waiting: 0
File.Uploader.API.Services.FileUtilityService: Information: Uploaded file: test-file-11.xls. Time taken: 724
File.Uploader.API.Services.FileUtilityService: Information: Number files waiting: 0
File.Uploader.API.Services.FileUtilityService: Information: Uploaded file: test-file-2.txt. Time taken: 923
File.Uploader.API.Services.FileUtilityService: Information: Number files waiting: 0
File.Uploader.API.Services.FileUtilityService: Information: Uploaded file: test-file-9.txt. Time taken: 237
File.Uploader.API.Services.FileUtilityService: Information: Number files waiting: 0
File.Uploader.API.Services.FileUtilityService: Information: Uploaded file: test-file-6.txt. Time taken: 208
File.Uploader.API.Services.FileUtilityService: Information: Number files waiting: 0
File.Uploader.API.Services.FileUtilityService: Information: Uploaded file: test-file-7.txt. Time taken: 485
File.Uploader.API.Services.FileUtilityService: Information: Number files waiting: 0
File.Uploader.API.Services.FileUtilityService: Information: Uploaded file: test-file-5.txt. Time taken: 473
File.Uploader.API.Services.FileUtilityService: Information: Uploaded file: test-file-8.txt. Time taken: 921
File.Uploader.API.Services.FileUtilityService: Information: Uploaded file: test-file-4.txt. Time taken: 409
File.Uploader.API.Services.FileUtilityService: Information: 10 files have been uploaded successfully.
Microsoft.AspNetCore.Mvc.Infrastructure.ObjectResultExecutor: Information: Executing ObjectResult, writing value of type 'File.Uploader.API.Models.FileUploadResponse'.
Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker: Information: Executed action File.Uploader.API.Controllers.FileUtilityController.UploadFiles (File.Uploader.API) in 25651.7835ms
Microsoft.AspNetCore.Routing.EndpointMiddleware: Information: Executed endpoint 'File.Uploader.API.Controllers.FileUtilityController.UploadFiles (File.Uploader.API)'
Microsoft.AspNetCore.Hosting.Diagnostics: Information: Request finished in 25765.659ms 200 application/json; charset=utf-8

After the uploads have completed, the API outputs the response, which is a list of the files that have uploaded in the order of the upload request:

{
  "errorMessage": "",
  "data": [
    {
      "id": 185,
      "status": "OK",
      "fileName": "test-file-5.txt",
      "errorMessage": ""
    },
    {
      "id": 183,
      "status": "OK",
      "fileName": "test-file-9.txt",
      "errorMessage": ""
    },
    {
      "id": 180,
      "status": "OK",
      "fileName": "test-file-6.txt",
      "errorMessage": ""
    },
    {
      "id": 178,
      "status": "OK",
      "fileName": "test-file-4.txt",
      "errorMessage": ""
    },
    {
      "id": 184,
      "status": "OK",
      "fileName": "test-file-11.xls",
      "errorMessage": ""
    },
    {
      "id": 182,
      "status": "OK",
      "fileName": "test-file-8.txt",
      "errorMessage": ""
    },
    {
      "id": 179,
      "status": "OK",
      "fileName": "test-file-3.txt",
      "errorMessage": ""
    },
    {
      "id": 177,
      "status": "OK",
      "fileName": "test-file-2.txt",
      "errorMessage": ""
    },
    {
      "id": 181,
      "status": "OK",
      "fileName": "test-file-7.txt",
      "errorMessage": ""
    },
    {
      "id": 176,
      "status": "OK",
      "fileName": "test-file-1.txt",
      "errorMessage": ""
    }
  ]
}

To verify the files have been uploaded into the backend SQL table we run a SELECT to confirm this:

SELECT  [Id], 
        [FileName],
        [FileSize],
        [FileContent],
        [UploadDate],
        [UploadedBy]
FROM    [FileDemoDB].[dbo].[UploadedFiles]
WHERE   [Id] >= 176

The table records validate the order of record uploads that are output from the Web API JSON response:

Applying the use of the SemaphoreSlim synchronization object has allowed us to place a limitation on the number of concurrent operations that can be permitted to run on a shared resource. These shared resources can be a backend database, network connections, server connections and so on. The number of concurrent operations that we choose is dependent on the limitations and restrictions of the server or network resources that we are accessing in our application.

That is all for today’s post.

I hope you found this post useful and informative.

Social media & sharing icons powered by UltimatelySocial