Processamento assíncrono e queues no Windows Azure
Olá pessoal,
Hoje vou fazer um post rápido sobre como trabalhar com processamento assíncrono e queues no Windows Azure. Este padrão de processamento assíncrono é muito útil para liberar o processamento do lado do cliente, sem ficar bloqueado esperando algum tipo de retorno e permite a possibilidade de escalar o lado servidor conforme a demanda de processamento de mensagens, geralmente implementado através de um worker role.
Pela imagem acima, é possível perceber que o padrão consiste de um lado que grava uma mensagem em uma fila e de outro lado que lê esta mensagem para realizar o processamento.
Para gravar a ler a mensagem da fila é necessário utilizar a classe CloudQueueClient, disponível no assembly Microsoft.WindowsAzure.StorageClient. Do lado de quem grava a mensagem, por exemplo, uma Web Role o código utilizado seria similar ao abaixo:
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
CloudQueueClient queueStorage = storageAccount.CreateCloudQueueClient();
var queue = queueStorage.GetQueueReference(queueName.ToLower());
queue.CreateIfNotExist();
var message = new CloudQueueMessage(conteudoMensagem);
queue.AddMessage(message);
Já para ler a mensagem, o Worker Role deve ter uma implementação muito parecida com a abaixo, tratando detalhes da queue no método OnStart e tratando as mensagens no método Run.
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// read storage account configuration settings
CloudStorageAccount.SetConfigurationSettingPublisher((configName, configSetter) =>
{
configSetter(RoleEnvironment.GetConfigurationSettingValue(configName));
});
var storageAccount = CloudStorageAccount.FromConfigurationSetting("WindowsAzureStorageConnectionString");
// initialize queue storage
CloudQueueClient queueStorage = storageAccount.CreateCloudQueueClient();
queue = queueStorage.GetQueueReference(RoleEnvironment.GetConfigurationSettingValue("QueueName"));
bool storageInitialized = false;
while (!storageInitialized)
{
try
{
// create the message queue(s)
queue.CreateIfNotExist();
storageInitialized = true;
}
catch (StorageClientException e)
{
if (e.ErrorCode == StorageErrorCode.TransportError)
{
Trace.TraceError("Storage services initialization failure. "
+ "Check your storage account configuration settings. If running locally, "
+ "ensure that the Development Storage service is running. Message: '{0}'", e.Message);
System.Threading.Thread.Sleep(5000);
}
else
{
throw;
}
}
}
return base.OnStart();
}
public override void Run()
{
Trace.TraceInformation("Listening for queue messages...");
while (true)
{
try
{
// retrieve a new message from the queue
CloudQueueMessage msg = queue.GetMessage();
if (msg != null)
{
// parse message retrieved from queue
var messageContent = msg.AsString;
ProcessMessage(messageContent);
// remove message from queue
queue.DeleteMessage(msg);
}
else
{
System.Threading.Thread.Sleep(1000);
}
}
catch (StorageClientException e)
{
Trace.TraceError("Exception when processing queue item. Message: '{0}'", e.Message);
System.Threading.Thread.Sleep(5000);
}
}
}