Основы хранилища Windows Azure. Очереди. (ru-RU)
Очереди Windows Azure
http://hpcru.files.wordpress.com/2012/03/clip_image0011.png?w=343&h=271
1) Очереди используются для хранения сообщения
2) Используют FIFO, однонаправленные
3) Имя очереди должно быть именем в нижнем регистре и URL-friendly
.http://hpcru.files.wordpress.com/2012/03/clip_image002_thumb.png?w=343&h=141
Сообщения:
- Очередь может содержать неограниченное количество сообщений
- Сообщения должны быть сериализуемы как XML
- Размер ограничен 64Кб
- Обычно используется паттерн work ticket
- Сборщик мусора для очереди запускается раз в неделю
http://hpcru.files.wordpress.com/2012/03/image_thumb42.png?w=351&h=260
Ссылка на очередь выглядит стандартно для именования сущностей в сервисах хранилища Windows Azure:
http://<account>.queue.core.windows.net/<QueueName>
К основным операциям над очередями с использованием клиента облачных очередей CloudQueueClient можно отнести получение ссылки на очередь (GetQueueReference, возвращает объект CloudQueue**)** и получение списка очередей (ListQueues).
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();
var queue = queueClient.GetQueueReference("messages");
У объекта CloudQueue доступно несколько операций по управлению очередью:
- SetMetadata: Определяет метаданные для очереди.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();
var queue = queue.Client.GetQueueReference("messages");
queue.CreateIfNotExist();
queue.Metadata.Add(new NameValueCollection()
{
{"Name","Ahriman"}.
{"DateOfMsgCreation", DateTime.UtcNow.ToString()}
});
queue.SetMetadata();
- FetchAttributes: Загружает метаданные и атрибуты.
- Clear: Очищает очередь.
- Create: Создает очередь. Если очередь уже существует, будет выброшено исключение.
Асинхронное создание очереди:
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();
var queue = queue.Client.GetQueueReference("messages");
if (!queue.Exists)){
queue.BeginCreate(EndCreateQueue, queue);
}
protected static void EndCreateQueue(IAsyncResult result)
{
var queue = result.AsyncState as CloudQueue;
queue.EndCreate(result);
}
- CreateIfNotExists: Создает очередь в том случае, если ее не существует.
- Delete: Удаляет очередь.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();
var queue = queue.Client.GetQueueReference("messages");
queue.Delete();
- Exists: Проверяет существование очереди.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();
var queue = queue.Client.GetQueueReference("messages");
if (!queue.Exists())
{
queue.Create();
}
- AddMessage: Добавляет сообщение в очередь.
- DeleteMessage: Удаляет сообщение из очереди.
- GetMessage: Возвращает следующее сообщение из очереди. В это время сообщение становится “невидимым” для других обработчиков.
- GetMessages: Возвращает определенное количество сообщений из очереди.
- PeekMessage: Возвращает сообщение из очереди, оставляя его видимым для других обработчиков (“подсматривает”).
- PeekMessages: Возвращает определенное количество сообщений из очереди, оставляя их видимыми для других обработчиков.
- UpdateMessage: Изменяет содержимое или время видимости сообщения.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();
var queue = queue.Client.GetQueueReference("messages");
//создание объекта заранее определенного класса, помеченного как Serialize
var msg = new Message {id = Guid.NewGuid().ToString(), name = "Ahriman"};
//создание объекта сообщения облачной очереди и передача в качестве аргумента созданного //сериализованного (с помощью какого-либо реализова��ного метода) объекта msg
var queueMessage = new CloudQueueMessage(msg.SerializeToBinary());
//добавление сообщения в очередь. Первый аргумент - сообщение, второй - время жизни сообщения //(максимум - 7 дней, нельзя забывать про Garbage Collector), третий, необязательный аргумент - //время "невидимости" сообщения для обработчика после добавления сообщения в очередь
queue.AddMessage(queueMessage,TimeSpan.FromMinutes(5));
//”подсматривание”, но не забор сообщения из очереди
var peekedMessage = queue.PeekMessage();
//получение сообщения из начала очереди
var retrievedMessage = queue.GetMessage();
retrievedMessage.SetMessageContent("Some update.");
//сообщение в очереди обновляется и становится сразу же видимым для обработчиков
queue.UpdateMessage(retrievedMessage,TimeSpan.FromSeconds(0.0), MessageUpdateFields.Content | MessageUpdateFields.Visibility);
//печать значения свойства Id возвращенного объекта Message.
System.Console.WriteLine(retrievedMessage.FromMessage<Message>().Id);
//удаление сообщения из очереди
queue.DeleteMessage(retrievedMessage);
RetrieveApproximateMessageCount:
Возвращает примерно количество сообщений в очереди. Примерное – потому, что сообщение могут быть добавлены или удалены после того, как вы отправите запрос на количество сообщений очереди.
var queueClient = CloudStorageAccount.FromConfigurationSetting("DataStorage").CreateCloudQueueClient();
var queue = queue.Client.GetQueueReference("messages");
int msgCount = queue.RetrieveApproximateMessageCount();
int cachedMsgCount = queue.ApproximateMessageCount;
Опрос очереди на наличие нового сообщения в обработчике принято выполнять в бесконечном цикле.
while (true)
{
var msg = queue.GetMessage();
if (msg != null)
{
string retrievedMsg = msg.AsString;
//логика
query.DeleteMessage(msg);
}
else
{
Thread.Sleep(1000);
}
Обратите внимание, что использование Thread.Sleep необосновано с экономической точки зрения, поэтому рекомендуется использовать Timer.
Усеченный экспоненциальный алгоритм отката
Одним из подходов к опросу очереди и облегчению нагрузки на очередь является использование усеченного экспоненциального алгоритма отката. Суть заключается в том, что каждый “пустой” опрос увеличивает интервал опроса вдвое, не пустой же опрос ставит интервал обратно в 1.
while (true)
{
var msg = queue.GetMessage();
if (msg != null)
{
string retrievedMsg = msg.AsString;
//логика
query.DeleteMessage(msg);
if (gradualDecrease)
if (curInterval > intervalFloor) curInterval = curInterval /2;
else curInterval = intervalFloor;
else curInterval = intervalFloor;
}
else
{
if (curInterval <intervalCeiling) curInterval = curInterval * 2;
}
Долгие очереди
Очередь, собирающая сообщения в период отключённого потребителя, называется долгой. В этом случае потребитель очереди может выходить в онлайн раз в день и забирать все сообщения, затем снова отключаться. Полезно для обмена сообщениями с внешними вендорами, а также для сокращения оплаты за время выполнения сервиса (в том случае, если потребитель является, например, Worker-ролью в Windows Azure).
Рекомендация
В качестве рекомендаций можно упомянуть разбиение сложного процесса на состояние, при этом каждое состояние является отдельным процессом-потребителем, опрашивающим определенную очередь.
http://hpcru.files.wordpress.com/2012/03/image_thumb43.png?w=325&h=201
Неоптимальная архитектура – один большой потребитель обрабатывает одну очередь. Является негибким подходом.
http://hpcru.files.wordpress.com/2012/03/image_thumb44.png?w=327&h=235
Архитектура, являющаяся более оптимальной с позиции гибкости – разделенная на различных потребителей и очередей, каждый из которых занимается собственной задачей.