Azure Service Bus Queue - High number of incomming requests even though the client is turned off.

Michael Overgaard Nielsen 0 Reputation points
2024-10-09T14:09:10.01+00:00

I'm facing a challenge with an Azure Service Bus Queue. I have an ASP.NET Core API that forwards incoming requests to a queue. Additionally, I have a background service that checks for new messgaes every 10 minutes. The service is supposed to shut down the client, sender, and receiver each time after processing.

The big question: Why have incoming requests skyrocketed? I'm only making one call every 10 minutes and closing the client each time.

Second question: Will i be charged for these requests, or are they not counted?

Code for sender:

public async Task<CIP_ResponsObject> PublishToQueue(string? JSONObject, string? nameSpace, string? queueName)
        {
            CIP_ResponsObject Mes = new(); // Prepare a return object 
            if (!string.IsNullOrEmpty(nameSpace) && !string.IsNullOrEmpty(queueName))
            {
                if (!string.IsNullOrEmpty(JSONObject))
                {
                    ServiceBusClient client;
                    ServiceBusSender sender;
                    ServiceBusClientOptions clientOptions = new()
                    {
                        TransportType = ServiceBusTransportType.AmqpWebSockets
                    };
                    //client = new ServiceBusClient($"{nameSpace}.servicebus.windows.net", new DefaultAzureCredential(), clientOptions);
                    client = new ServiceBusClient(_configuration["AzureServiceBus:ConnectionString"], clientOptions);
                    sender = client.CreateSender(queueName);
                    try
                    {
                        // Use the client to send the message to the Service Bus queue
                        await sender.SendMessageAsync(new ServiceBusMessage(Encoding.UTF8.GetBytes(JSONObject)));
                        // Return success message
                        Mes.Status = 200;
                        Mes.Message = $"The message has been published to the queue.";
                        Mes.ResponsAt = DateTime.UtcNow;
                        return Mes;
                    }
                    finally
                    {
                        // Calling DisposeAsync on client types is required to ensure that network
                        // resources and other unmanaged objects are properly cleaned up.                       
                        await sender.DisposeAsync();
						await sender.CloseAsync();
						await client.DisposeAsync();						                       
					}
                }
                else
                {
                    // Return error message
                    Mes.Status = 400;
                    Mes.Error = "Message was empty";
                    Mes.Message = $"Message: {JSONObject}";
                    Mes.ResponsAt = DateTime.UtcNow;
                    return Mes;
                }
            }
            else
            {
                // Return error message
                Mes.Status = 400;
                Mes.Error = "Queue namespace or queue name was missing message";
                Mes.Message = $"Queue namespace: {nameSpace}. Queue name: {queueName}";
                Mes.ResponsAt = DateTime.UtcNow;
                return Mes;
            }
        }

Code for receiver:

public class QueueBackgroundService : BackgroundService
{
	private readonly IConfiguration Configuration;
	private readonly ILogger<QueueBackgroundService> _logger;
	private readonly string _connectionString;
	private readonly string _performanceQueueName;
	private readonly string _logQueueName;
	private readonly string _trackQueueName;
	private readonly int _callingInterval;
	private readonly int _maxMessages;
	public QueueBackgroundService(IConfiguration configuration, ILogger<QueueBackgroundService> logger)
	{
		Configuration = configuration;
		_logger = logger;
		_connectionString = Configuration["AzureServiceBus:ConnectionString"];
		_performanceQueueName = Configuration["AzureServiceBus:PerformanceQueueName"];
		_logQueueName = Configuration["AzureServiceBus:LogQueueName"];
		_trackQueueName = Configuration["AzureServiceBus:TrackQueueName"];
		_callingInterval = Int32.TryParse(Configuration["AzureServiceBus:CallingInterval"], out int callingInterval) ? callingInterval : 10;
		_maxMessages = Int32.TryParse(Configuration["AzureServiceBus:MaxMessages"], out int maxMessages) ? maxMessages : 10;
	}
	protected override async Task ExecuteAsync(CancellationToken stoppingToken)
	{
		while (!stoppingToken.IsCancellationRequested) 
		{
			try
			{				
				// Wait xxxx before asking again
				await Task.Delay(TimeSpan.FromSeconds(_callingInterval), stoppingToken);
				_logger.LogInformation($"Starting polling for messages at: {DateTimeOffset.Now}");
				// Process get messages from queue              
				await ReceiveMessagesAsync(stoppingToken);
			}
			catch (Exception ex)
			{
				_logger.LogError(ex.Message, "An error occurred while receiving messages.");
			}
		}
		_logger.LogInformation($"Stopping polling for messages at: {DateTimeOffset.Now}");
	}
	private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
	{
		ServiceBusClient _client = new(_connectionString);
		await using ServiceBusReceiver receiver = _client.CreateReceiver(_performanceQueueName);
		try
		{
			// Get messages
			var messages = await receiver.ReceiveMessagesAsync(maxMessages: _maxMessages, cancellationToken: cancellationToken);
			if (messages.Any())
			{
				// Process messages
				await ProcessMessagesAsync(messages);
				// Complete messages
				foreach (var message in messages)
					await receiver.CompleteMessageAsync(message, cancellationToken);
			}
			else
			{
				_logger.LogInformation("No messages received.");
			}
		}
		catch (ServiceBusException sbEx)
		{
			_logger.LogError(sbEx.Message, "An error occurred while receiving messages from the queue.");
		}
		finally
		{
			_logger.LogInformation($"Closing down and stopping polling for messages at: {DateTimeOffset.Now}");
			await receiver.DisposeAsync();
			await receiver.CloseAsync(cancellationToken);
			await _client.DisposeAsync();		
		}
	}
	private async Task ProcessMessagesAsync(IEnumerable<ServiceBusReceivedMessage> messages)
	{
		// Map messages to performance objects
		List<LogPerformance> performances = [];
		foreach (var message in messages)
		{
			LogPerformance performance = JsonSerializer.Deserialize<LogPerformance>(message.Body.ToString());
			performances.Add(performance);					
		}
		try
		{
			// Inserting into database
			using PerformanceDbContext context = new();
			context.LogPerformances.AddRange(performances);
			await context.SaveChangesAsync();
			_logger.LogInformation("Messages inserted into the database successfully.");
		}
		catch (Exception ex)
		{
			_logger.LogError(ex.Message, "Database insertion failed.");
		}
	}
}
Azure Service Bus
Azure Service Bus
An Azure service that provides cloud messaging as a service and hybrid integration.
645 questions
ASP.NET Core
ASP.NET Core
A set of technologies in the .NET Framework for building web applications and XML web services.
4,639 questions
C#
C#
An object-oriented and type-safe programming language that has its roots in the C family of languages and includes support for component-oriented programming.
11,054 questions
ASP.NET API
ASP.NET API
ASP.NET: A set of technologies in the .NET Framework for building web applications and XML web services.API: A software intermediary that allows two applications to interact with each other.
346 questions
0 comments No comments
{count} votes

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.