Введение в надежные параллельные очереди в Azure Service Fabric
Надежная параллельная очередь — это асинхронная, транзакционная и реплицируемая очередь, которая обладает высокой степенью параллелизма при операциях постановки в очередь и вывода из нее. Она предназначена для обеспечения высокой пропускной способности и низкой задержки. Она ослабляет строгое упорядочение FIFO, гарантируемое надежной очередью, и вместо этого обеспечивает упорядочение наилучшим возможным образом.
Программные интерфейсы
Параллельная очередь | Надежная параллельная очередь |
---|---|
void Enqueue(T item) | Task EnqueueAsync(ITransaction tx, T item) |
bool TryDequeue(out T result) | Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx) |
int Count() | long Count() |
Сравнение с надежной очередью
Надежная параллельная очередь предлагается в качестве альтернативы надежной очереди. Ее следует использовать в случаях, когда строгое упорядочение FIFO не требуется, так как модель FIFO требует уменьшения степени параллелизма. Надежная очередь использует блокировки для принудительного упорядочивания FIFO, разрешая ставить в очередь и выводить из нее одновременно максимум одну транзакцию за раз. Для сравнения надежная параллельная очередь снижает ограничения упорядочивания и позволяет чередовать операции постановки в очередь и вывода из нее любому количеству параллельных транзакций. Так обеспечивается упорядочивание наилучшим возможным образом, однако относительный порядок двух значений в надежной параллельной очереди не гарантируется.
Надежная параллельная очередь обеспечивает высокую пропускную способность и более низкую задержку, чем надежная очередь, во всех случаях, когда имеется несколько параллельных транзакций, выполняющих постановку в очередь и вывод из нее.
Пример использования надежной параллельной очереди — это сценарий с очередью сообщений. В этом сценарии один или несколько отправителей создают и добавляют элементы в очередь и один или несколько получателей извлекают сообщения из очереди и обрабатывают их. Несколько отправителей и получателей могут работать независимо, используя параллельные транзакции для обработки очередей.
Правила использования
- Очередь предполагает, что элементы в ней имеют низкий период удержания. То есть элементы не будут оставаться в очереди в течение длительного времени.
- Очередь не гарантирует строгий порядок FIFO.
- Очередь не считывает свои собственные записи. Если элемент поставлен в очередь внутри транзакции, он не будет виден для средства вывода из очереди внутри той же транзакции.
- Операции вывода из очереди не изолированы друг от друга. Если элемент A выведен из очереди в транзакции txnA, тогда как txnA не зафиксирована, элемент A не будет виден параллельной транзакции txnB. Если транзакция txnA прервется, элемент A немедленно станет видимым для транзакции txnB.
- Поведение TryPeekAsync может быть реализовано с использонием метода TryDequeueAsync с последующим прерыванием транзакции. Пример такого поведения можно найти в разделе "Шаблоны программирования".
- Число является нетранзакционным. Это число позволяет получить представление о количестве элементов в очереди, но так как оно представляет данные на определенный момент времени, на него нельзя положиться.
- Ресурсоемкая обработка выведенных из очереди элементов не должна выполняться во время транзакции, чтобы избежать продолжительных транзакций, которые могут оказывать влияние на работу системы.
Фрагменты кода
Рассмотрим несколько фрагментов кода и ожидаемые выходные данные. Обработка исключений не рассматривается в этом разделе.
Создание экземпляра
Создание экземпляра надежной параллельной очереди аналогично созданию любой другой надежной коллекции.
IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");
EnqueueAsync
Вот несколько фрагментов кода по использованию EnqueueAsync, после которых приведены ожидаемые выходные данные.
- Вариант 1. Одно задание постановки в очередь
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
Предположим, что задача выполнена успешно и одновременные транзакции, изменяющие очередь, отсутствовали. Очередь может содержать элементы в любом из следующих порядков:
10, 20
20, 10
- Вариант 2. Параллельное задание постановки в очередь
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
await this.Queue.EnqueueAsync(txn, 40, cancellationToken);
await txn.CommitAsync();
}
Предположим, что задачи успешно завершены, они выполнялись параллельно и не было других одновременных транзакций, изменяющих очередь. Невозможно сделать вывод о порядке элементов в очереди. В этом фрагменте кода элементы могут отображаться в любом из 4 возможных порядков. Очередь попытается разместить элементы в первоначальном порядке (постановки в очередь), но может быть вынуждена переупорядочить их из-за одновременных операций или сбоев.
DequeueAsync
Вот несколько фрагментов кода по использованию метода TryDequeueAsync, за которыми следуют ожидаемые выходные данные. Предположим, что очередь уже заполнена следующими элементами:
10, 20, 30, 40, 50, 60
- Вариант 1. Одно задание вывода из очереди
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await txn.CommitAsync();
}
Предположим, что задача выполнена успешно и одновременные транзакции, изменяющие очередь, отсутствовали. Так как нельзя сделать вывод о порядке элементов в очереди, любые три элемента могут быть выведены из нее в любом порядке. Очередь попытается разместить элементы в первоначальном порядке (постановки в очередь), но может быть вынуждена переупорядочить их из-за одновременных операций или сбоев.
- Вариант 2. Параллельное задание вывода из очереди
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
Предположим, что задачи успешно завершены, они выполнялись параллельно и не было других одновременных транзакций, изменяющих очередь. Так как невозможно сделать вывод о порядке элементов в очереди, списки dequeue1 и dequeue2 будут содержать любые два элемента в любом порядке.
Один и тот же элемент не будет содержаться в обоих списках. Поэтому если dequeue1 имеет элемент 10, 30, то deque2 будет иметь элементы 20, 40.
- Вариант 3. Упорядочение вывода из очереди с прерыванием транзакции
Прерывание транзакции во время вывода из очереди возвращает элементы обратно в начало очереди. Порядок, в котором элементы возвращаются в начало очереди, не гарантируется. Рассмотрим следующий пример:
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
// Abort the transaction
await txn.AbortAsync();
}
Предположим, что элементы выводились из очереди в следующем порядке:
10, 20
Когда мы прервем транзакцию, элементы будут добавлены обратно в начало очереди в одном из следующих порядков:
10, 20
20, 10
То же самое верно для всех случаев, когда транзакция не была успешно зафиксирована.
Шаблоны программирования
В этом разделе мы рассмотрим несколько шаблонов программирования, которые могут быть полезны при использовании надежной параллельной очереди.
Вывод из очереди в пакетном режиме
Этот шаблон программирования подразумевает, что задача получателя выполняет вывод из очереди в пакетном режиме, вместо того чтобы выполнять один вывод за раз. Пользователь может регулировать задержки между пакетами и размеры пакетов. В следующем фрагменте кода показана эта модель программирования. Учтите, что в этом примере обработка выполняется после фиксирования транзакции, поэтому, если во время обработки произойдет ошибка, необработанные элементы будут потеряны. Кроме того, обработка может быть выполнена в пределах области транзакции, но это может отрицательно сказаться на производительности и требует обработки уже обработанных элементов.
int batchSize = 5;
long delayMs = 100;
while(!cancellationToken.IsCancellationRequested)
{
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
for(int i = 0; i < batchSize; ++i)
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
else
{
// else break the for loop
break;
}
}
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
int delayFactor = batchSize - processItems.Count;
await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}
Оптимальная обработка на основе уведомлений
Другой интересный шаблон программирования использует API подсчета. Здесь мы можем реализовать обработку на основе уведомлений для очереди. Число элементов в очереди может использоваться для регулирования заданий постановки в очередь и вывода из нее. Обратите внимание, что, как и в предыдущем примере, так как обработка происходит за пределами транзакции, необработанные элементы могут быть потеряны, если во время обработки возникнет ошибка.
int threshold = 5;
long delayMs = 1000;
while(!cancellationToken.IsCancellationRequested)
{
while (this.Queue.Count < threshold)
{
cancellationToken.ThrowIfCancellationRequested();
// If the queue does not have the threshold number of items, delay the task and check again
await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
}
// If there are approximately threshold number of items, try and process the queue
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
} while (processItems.Count < threshold && ret.HasValue);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
}
Оптимальная очистка
Очистка очереди не гарантируется из-за свойства параллельности структуры данных. Вполне возможно, что даже в очереди не выполняются пользовательские операции, конкретный вызов метода TryDequeueAsync может не вернуть элемент, который ранее был поставлен в очередь и зафиксирован. Поставленный в очередь элемент в конечном счете станет видимым для операции вывода из очереди, но без внешнего механизма обмена данными независимый получатель не узнает, что очередь достигла устойчивого состояния, даже если все процедуры были остановлены, а новые операции ввода в очередь запрещены. Таким образом операция очистки выполняется наилучшим возможным образом, как показано ниже.
Прежде чем попытаться очистить очередь, пользователь должен остановить все последующие задачи отправителя и получателя и ожидать подтверждения или прерывания любых выполняемых транзакций. Если пользователь знает предполагаемое количество элементов в очереди, он может настроить уведомление, которое сигнализирует о том, что все элементы выведены из очереди.
int numItemsDequeued;
int batchSize = 5;
ConditionalValue ret;
do
{
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if(ret.HasValue)
{
// Buffer the dequeues
processItems.Add(ret.Value);
}
} while (ret.HasValue && processItems.Count < batchSize);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
} while (ret.HasValue);
Обзор
Надежная параллельная очередь не предоставляет API TryPeekAsync. Пользователи могут получить семантику обзора с помощью метода TryDequeueAsync и прерывания транзакции. В этом примере вывод из очереди обрабатывается только в том случае, если значение элемента больше чем 10.
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
bool valueProcessed = false;
if (ret.HasValue)
{
if (ret.Value > 10)
{
// Process the item
Console.WriteLine("Value : " + ret.Value);
valueProcessed = true;
}
}
if (valueProcessed)
{
await txn.CommitAsync();
}
else
{
await txn.AbortAsync();
}
}
Дополнительные сведения
- Get started with Reliable Services (Начало работы с Reliable Services)
- Работа с надежными коллекциями
- Уведомления Reliable Services
- Резервное копирование и восстановление служб Reliable Services и субъектов Reliable Actors
- Настройка надежных служб с отслеживанием состояния
- ASP.NET Core в Service Fabric Reliable Services
- Дополнительные возможности использования модели программирования надежных служб
- Справочник разработчика по надежным коллекциям