Processamento assíncrono e queues no Windows Azure

Olá pessoal,

Hoje vou fazer um post rápido sobre como trabalhar com processamento assíncrono e queues no Windows Azure. Este padrão de processamento assíncrono é muito útil para liberar o processamento do lado do cliente, sem ficar bloqueado esperando algum tipo de retorno e permite a possibilidade de escalar o lado servidor conforme a demanda de processamento de mensagens, geralmente implementado através de um worker role.

image

Pela imagem acima, é possível perceber que o padrão consiste de um lado que grava uma mensagem em uma fila e de outro lado que lê esta mensagem para realizar o processamento.

Para gravar a ler a mensagem da fila é necessário utilizar a classe CloudQueueClient, disponível no assembly Microsoft.WindowsAzure.StorageClient. Do lado de quem grava a mensagem, por exemplo, uma Web Role o código utilizado seria similar ao abaixo:

 CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString); 
CloudQueueClient queueStorage = storageAccount.CreateCloudQueueClient();
var queue = queueStorage.GetQueueReference(queueName.ToLower());
queue.CreateIfNotExist();
var message = new CloudQueueMessage(conteudoMensagem);
queue.AddMessage(message);
  

Já para ler a mensagem, o Worker Role deve ter uma implementação muito parecida com a abaixo, tratando detalhes da queue no método OnStart e tratando as mensagens no método Run.

  
 public override bool OnStart()
{
    // Set the maximum number of concurrent connections 
    ServicePointManager.DefaultConnectionLimit = 12;

    // read storage account configuration settings
    CloudStorageAccount.SetConfigurationSettingPublisher((configName, configSetter) =>
    {
        configSetter(RoleEnvironment.GetConfigurationSettingValue(configName));
    });

    var storageAccount = CloudStorageAccount.FromConfigurationSetting("WindowsAzureStorageConnectionString");

    // initialize queue storage 
    CloudQueueClient queueStorage = storageAccount.CreateCloudQueueClient();
    queue = queueStorage.GetQueueReference(RoleEnvironment.GetConfigurationSettingValue("QueueName"));

    bool storageInitialized = false;
    while (!storageInitialized)
    {
        try
        {
            // create the message queue(s)
            queue.CreateIfNotExist();

            storageInitialized = true;
        }
        catch (StorageClientException e)
        {
            if (e.ErrorCode == StorageErrorCode.TransportError)
            {
                Trace.TraceError("Storage services initialization failure. "
                    + "Check your storage account configuration settings. If running locally, "
                    + "ensure that the Development Storage service is running. Message: '{0}'", e.Message);
                System.Threading.Thread.Sleep(5000);
            }
            else
            {
                throw;
            }
        }
    }

    return base.OnStart();
}
 public override void Run()
{
    Trace.TraceInformation("Listening for queue messages...");

    while (true)
    {
        try
        {
            // retrieve a new message from the queue
            CloudQueueMessage msg = queue.GetMessage();
            if (msg != null)
            {
                // parse message retrieved from queue
                var messageContent = msg.AsString;

                ProcessMessage(messageContent);

                // remove message from queue
                queue.DeleteMessage(msg);
            }
            else
            {
                System.Threading.Thread.Sleep(1000);
            }
        }
        catch (StorageClientException e)
        {
            Trace.TraceError("Exception when processing queue item. Message: '{0}'", e.Message);
            System.Threading.Thread.Sleep(5000);
        }
    }
}