Поделиться через


Основы хранилища Windows Azure. Очереди. (ru-RU)

Очереди Windows Azure

http://hpcru.files.wordpress.com/2012/03/clip_image0011.png?w=343&h=271

1) Очереди используются для хранения сообщения

2) Используют FIFO, однонаправленные

3) Имя очереди должно быть именем в нижнем регистре и URL-friendly

.http://hpcru.files.wordpress.com/2012/03/clip_image002_thumb.png?w=343&h=141

Сообщения:

  1. Очередь может содержать неограниченное количество сообщений
  2. Сообщения должны быть сериализуемы как XML
  3. Размер ограничен 64Кб
  4. Обычно используется паттерн work ticket
  5. Сборщик мусора для очереди запускается раз в неделю

http://hpcru.files.wordpress.com/2012/03/image_thumb42.png?w=351&h=260

Ссылка на очередь выглядит стандартно для именования сущностей в сервисах хранилища Windows Azure:

http://<account>.queue.core.windows.net/<QueueName>

К основным операциям над очередями с использованием клиента облачных очередей CloudQueueClient можно отнести получение ссылки на очередь (GetQueueReference, возвращает объект CloudQueue**)** и получение списка очередей (ListQueues).

var queueClient =  CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queueClient.GetQueueReference("messages");

У объекта CloudQueue доступно несколько операций по управлению очередью:

  • SetMetadata: Определяет метаданные для очереди.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

queue.CreateIfNotExist();

queue.Metadata.Add(new NameValueCollection()

{

{"Name","Ahriman"}.

{"DateOfMsgCreation", DateTime.UtcNow.ToString()}

});

queue.SetMetadata();
  • FetchAttributes: Загружает метаданные и атрибуты.
  • Clear: Очищает очередь.
  • Create: Создает очередь. Если очередь уже существует, будет выброшено исключение.

Асинхронное создание очереди:

var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

if (!queue.Exists)){

queue.BeginCreate(EndCreateQueue, queue);

}

protected static void EndCreateQueue(IAsyncResult result)

{

var queue = result.AsyncState as CloudQueue;

queue.EndCreate(result);

}
  • CreateIfNotExists: Создает очередь в том случае, если ее не существует.
  • Delete: Удаляет очередь.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

queue.Delete();
  • Exists: Проверяет существование очереди.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

if (!queue.Exists())

{

queue.Create();

}
  • AddMessage: Добавляет сообщение в очередь.
  • DeleteMessage: Удаляет сообщение из очереди.
  • GetMessage: Возвращает следующее сообщение из очереди. В это время сообщение становится “невидимым” для других обработчиков.
  • GetMessages: Возвращает определенное количество сообщений из очереди.
  • PeekMessage: Возвращает сообщение из очереди, оставляя его видимым для других обработчиков (“подсматривает”).
  • PeekMessages: Возвращает определенное количество сообщений из очереди, оставляя их видимыми для других обработчиков.
  • UpdateMessage: Изменяет содержимое или время видимости сообщения.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

//создание объекта заранее определенного класса, помеченного как Serialize

var msg = new Message {id = Guid.NewGuid().ToString(), name = "Ahriman"};

//создание объекта сообщения облачной очереди и передача в качестве аргумента созданного //сериализованного (с помощью какого-либо реализова��ного метода) объекта msg

var queueMessage = new CloudQueueMessage(msg.SerializeToBinary());

//добавление сообщения в очередь. Первый аргумент - сообщение, второй - время жизни сообщения //(максимум - 7 дней, нельзя забывать про Garbage Collector), третий, необязательный аргумент - //время "невидимости" сообщения для обработчика после добавления сообщения в очередь

queue.AddMessage(queueMessage,TimeSpan.FromMinutes(5));

 

//”подсматривание”, но не забор сообщения из очереди

var peekedMessage = queue.PeekMessage();

//получение сообщения из начала очереди
var retrievedMessage = queue.GetMessage();

 

retrievedMessage.SetMessageContent("Some update.");

 

//сообщение в очереди обновляется и становится сразу же видимым для обработчиков

queue.UpdateMessage(retrievedMessage,TimeSpan.FromSeconds(0.0), MessageUpdateFields.Content | MessageUpdateFields.Visibility);


//печать значения свойства Id возвращенного объекта Message.

System.Console.WriteLine(retrievedMessage.FromMessage<Message>().Id);

//удаление сообщения из очереди

queue.DeleteMessage(retrievedMessage);

RetrieveApproximateMessageCount:

Возвращает примерно количество сообщений в очереди. Примерное – потому, что сообщение могут быть добавлены или удалены после того, как вы отправите запрос на количество сообщений очереди.

var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();

var queue = queue.Client.GetQueueReference("messages");

int msgCount = queue.RetrieveApproximateMessageCount();

int cachedMsgCount = queue.ApproximateMessageCount;

Опрос очереди на наличие нового сообщения в обработчике принято выполнять в бесконечном цикле.

while (true)
 {
     var msg = queue.GetMessage();
     if (msg != null)
          {
                 string retrievedMsg = msg.AsString;
                 //логика
                 query.DeleteMessage(msg);
           }
      else
       {
            Thread.Sleep(1000);
       }

Обратите внимание, что использование Thread.Sleep необосновано с экономической точки зрения, поэтому рекомендуется использовать Timer.

Усеченный экспоненциальный алгоритм отката

Одним из подходов к опросу очереди и облегчению нагрузки на очередь является использование усеченного экспоненциального алгоритма отката. Суть заключается в том, что каждый “пустой” опрос увеличивает интервал опроса вдвое, не пустой же опрос ставит интервал обратно в 1.

while (true)
 {
     var msg = queue.GetMessage();
     if (msg != null)
          {
                 string retrievedMsg = msg.AsString;
                 //логика
                 query.DeleteMessage(msg);

                 if (gradualDecrease)
                      if (curInterval > intervalFloor) curInterval = curInterval /2;
                      else curInterval = intervalFloor;
                 else curInterval = intervalFloor;
           }
      else
       {
            if (curInterval <intervalCeiling) curInterval = curInterval * 2;

       }

Долгие очереди

Очередь, собирающая сообщения в период отключённого потребителя, называется долгой. В этом случае потребитель очереди может выходить в онлайн раз в день и забирать все сообщения, затем снова отключаться. Полезно для обмена сообщениями с внешними вендорами, а также для сокращения оплаты за время выполнения сервиса (в том случае, если потребитель является, например, Worker-ролью в Windows Azure).

Рекомендация

В качестве рекомендаций можно упомянуть разбиение сложного процесса на состояние, при этом каждое состояние является отдельным процессом-потребителем, опрашивающим определенную очередь.

http://hpcru.files.wordpress.com/2012/03/image_thumb43.png?w=325&h=201

Неоптимальная архитектура – один большой потребитель обрабатывает одну очередь. Является негибким подходом.

http://hpcru.files.wordpress.com/2012/03/image_thumb44.png?w=327&h=235

Архитектура, являющаяся более оптимальной с позиции гибкости – разделенная на различных потребителей и очередей, каждый из которых занимается собственной задачей.