Azure Service Bus
An Azure service that provides cloud messaging as a service and hybrid integration.
645 questions
This browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
I'm facing a challenge with an Azure Service Bus Queue. I have an ASP.NET Core API that forwards incoming requests to a queue. Additionally, I have a background service that checks for new messgaes every 10 minutes. The service is supposed to shut down the client, sender, and receiver each time after processing.
The big question: Why have incoming requests skyrocketed? I'm only making one call every 10 minutes and closing the client each time.
Second question: Will i be charged for these requests, or are they not counted?
Code for sender:
public async Task<CIP_ResponsObject> PublishToQueue(string? JSONObject, string? nameSpace, string? queueName)
{
CIP_ResponsObject Mes = new(); // Prepare a return object
if (!string.IsNullOrEmpty(nameSpace) && !string.IsNullOrEmpty(queueName))
{
if (!string.IsNullOrEmpty(JSONObject))
{
ServiceBusClient client;
ServiceBusSender sender;
ServiceBusClientOptions clientOptions = new()
{
TransportType = ServiceBusTransportType.AmqpWebSockets
};
//client = new ServiceBusClient($"{nameSpace}.servicebus.windows.net", new DefaultAzureCredential(), clientOptions);
client = new ServiceBusClient(_configuration["AzureServiceBus:ConnectionString"], clientOptions);
sender = client.CreateSender(queueName);
try
{
// Use the client to send the message to the Service Bus queue
await sender.SendMessageAsync(new ServiceBusMessage(Encoding.UTF8.GetBytes(JSONObject)));
// Return success message
Mes.Status = 200;
Mes.Message = $"The message has been published to the queue.";
Mes.ResponsAt = DateTime.UtcNow;
return Mes;
}
finally
{
// Calling DisposeAsync on client types is required to ensure that network
// resources and other unmanaged objects are properly cleaned up.
await sender.DisposeAsync();
await sender.CloseAsync();
await client.DisposeAsync();
}
}
else
{
// Return error message
Mes.Status = 400;
Mes.Error = "Message was empty";
Mes.Message = $"Message: {JSONObject}";
Mes.ResponsAt = DateTime.UtcNow;
return Mes;
}
}
else
{
// Return error message
Mes.Status = 400;
Mes.Error = "Queue namespace or queue name was missing message";
Mes.Message = $"Queue namespace: {nameSpace}. Queue name: {queueName}";
Mes.ResponsAt = DateTime.UtcNow;
return Mes;
}
}
Code for receiver:
public class QueueBackgroundService : BackgroundService
{
private readonly IConfiguration Configuration;
private readonly ILogger<QueueBackgroundService> _logger;
private readonly string _connectionString;
private readonly string _performanceQueueName;
private readonly string _logQueueName;
private readonly string _trackQueueName;
private readonly int _callingInterval;
private readonly int _maxMessages;
public QueueBackgroundService(IConfiguration configuration, ILogger<QueueBackgroundService> logger)
{
Configuration = configuration;
_logger = logger;
_connectionString = Configuration["AzureServiceBus:ConnectionString"];
_performanceQueueName = Configuration["AzureServiceBus:PerformanceQueueName"];
_logQueueName = Configuration["AzureServiceBus:LogQueueName"];
_trackQueueName = Configuration["AzureServiceBus:TrackQueueName"];
_callingInterval = Int32.TryParse(Configuration["AzureServiceBus:CallingInterval"], out int callingInterval) ? callingInterval : 10;
_maxMessages = Int32.TryParse(Configuration["AzureServiceBus:MaxMessages"], out int maxMessages) ? maxMessages : 10;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Wait xxxx before asking again
await Task.Delay(TimeSpan.FromSeconds(_callingInterval), stoppingToken);
_logger.LogInformation($"Starting polling for messages at: {DateTimeOffset.Now}");
// Process get messages from queue
await ReceiveMessagesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex.Message, "An error occurred while receiving messages.");
}
}
_logger.LogInformation($"Stopping polling for messages at: {DateTimeOffset.Now}");
}
private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
{
ServiceBusClient _client = new(_connectionString);
await using ServiceBusReceiver receiver = _client.CreateReceiver(_performanceQueueName);
try
{
// Get messages
var messages = await receiver.ReceiveMessagesAsync(maxMessages: _maxMessages, cancellationToken: cancellationToken);
if (messages.Any())
{
// Process messages
await ProcessMessagesAsync(messages);
// Complete messages
foreach (var message in messages)
await receiver.CompleteMessageAsync(message, cancellationToken);
}
else
{
_logger.LogInformation("No messages received.");
}
}
catch (ServiceBusException sbEx)
{
_logger.LogError(sbEx.Message, "An error occurred while receiving messages from the queue.");
}
finally
{
_logger.LogInformation($"Closing down and stopping polling for messages at: {DateTimeOffset.Now}");
await receiver.DisposeAsync();
await receiver.CloseAsync(cancellationToken);
await _client.DisposeAsync();
}
}
private async Task ProcessMessagesAsync(IEnumerable<ServiceBusReceivedMessage> messages)
{
// Map messages to performance objects
List<LogPerformance> performances = [];
foreach (var message in messages)
{
LogPerformance performance = JsonSerializer.Deserialize<LogPerformance>(message.Body.ToString());
performances.Add(performance);
}
try
{
// Inserting into database
using PerformanceDbContext context = new();
context.LogPerformances.AddRange(performances);
await context.SaveChangesAsync();
_logger.LogInformation("Messages inserted into the database successfully.");
}
catch (Exception ex)
{
_logger.LogError(ex.Message, "Database insertion failed.");
}
}
}