Wprowadzenie do elementu ReliableConcurrentQueue w usłudze Azure Service Fabric
Reliable Concurrent Queue to asynchroniczna, transakcyjna i replikowana kolejka, która oferuje wysoką współbieżność dla operacji kolejkowania i dequeue. Została zaprojektowana w celu zapewnienia wysokiej przepływności i małych opóźnień dzięki złagodzeniu ścisłej kolejności FIFO zapewnianej przez niezawodną kolejkę , a zamiast tego zapewnia najlepsze zamówienie.
Interfejsy API
Kolejka współbieżna | Niezawodna kolejka współbieżna |
---|---|
void Enqueue(element T) | Zadanie EnqueueAsync(ITransaction tx, element T) |
bool TryDequeue(wynik T) | Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx) |
liczba int() | long Count() |
Porównanie z niezawodną kolejką
Niezawodna kolejka współbieżna jest oferowana jako alternatywa dla niezawodnej kolejki. Należy go używać w przypadkach, gdy ścisłe porządkowanie FIFO nie jest wymagane, ponieważ zagwarantowanie fiFO wymaga kompromisu z współbieżnością. Funkcja Reliable Queue używa blokad w celu wymuszania porządkowania FIFO, przy użyciu co najwyżej jednej transakcji dozwolonej do kolejkowania i co najwyżej jednej transakcji dozwolonej do dequeue naraz. Dla porównania funkcja Reliable Concurrent Queue zwalnia ograniczenie kolejności i umożliwia wykonywanie dowolnej liczby współbieżnych transakcji w celu przeplatania ich operacji kolejkowania i dequeue. Podane jest porządkowanie najwygodniejsze, jednak względne porządkowanie dwóch wartości w niezawodnej kolejce współbieżnej nigdy nie może być gwarantowane.
Niezawodna kolejka współbieżna zapewnia większą przepływność i mniejsze opóźnienie niż niezawodna kolejka , gdy istnieje wiele współbieżnych transakcji wykonujących kolejki i/lub dequeues.
Przykładowy przypadek użycia metody ReliableConcurrentQueue to scenariusz kolejki komunikatów . W tym scenariuszu co najmniej jeden producent komunikatów tworzy i dodaje elementy do kolejki, a co najmniej jeden odbiorca komunikatów ściąga komunikaty z kolejki i przetwarza je. Wielu producentów i konsumentów może pracować niezależnie, używając współbieżnych transakcji w celu przetworzenia kolejki.
Zalecenia dotyczące użytkowania
- Kolejka oczekuje, że elementy w kolejce mają niski okres przechowywania. Oznacza to, że elementy nie pozostaną w kolejce przez długi czas.
- Kolejka nie gwarantuje ścisłej kolejności FIFO.
- Kolejka nie odczytuje własnych zapisów. Jeśli element jest w kolejce w ramach transakcji, nie będzie widoczny dla dequeuer w ramach tej samej transakcji.
- Kolejki nie są odizolowane od siebie. Jeśli element A jest zdequeued w txnA transakcji, mimo że txnA nie jest zatwierdzony, element A nie będzie widoczny dla równoczesnej transakcji txnB. Jeśli txnA przerywa, A stanie się widoczny dla txnB natychmiast.
- Zachowanie tryPeekAsync można zaimplementować przy użyciu narzędzia TryDequeueAsync , a następnie przerwania transakcji. Przykład tego zachowania można znaleźć w sekcji Wzorce programowania.
- Liczba nie jest transakcyjna. Może służyć do uzyskania pojęcia liczby elementów w kolejce, ale reprezentuje punkt w czasie i nie można go polegać.
- Kosztowne przetwarzanie elementów w kolejce nie powinno być wykonywane, gdy transakcja jest aktywna, aby uniknąć długotrwałych transakcji, które mogą mieć wpływ na wydajność systemu.
Wstawki kodu
Przyjrzyjmy się kilku fragmentom kodu i ich oczekiwanym wynikom. Obsługa wyjątków jest ignorowana w tej sekcji.
Wystąpienia
Tworzenie wystąpienia niezawodnej kolejki współbieżnej jest podobne do każdej innej niezawodnej kolekcji.
IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");
EnqueueAsync
Poniżej przedstawiono kilka fragmentów kodu do używania metody EnqueueAsync, a następnie ich oczekiwane dane wyjściowe.
- Przypadek 1. Jedno zadanie w kolejce
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
Załóżmy, że zadanie zostało ukończone pomyślnie i że nie było współbieżnych transakcji modyfikujących kolejkę. Użytkownik może oczekiwać, że kolejka będzie zawierać elementy w dowolnej z następujących kolejności:
10, 20
20, 10
- Przypadek 2. Równoległe zadanie kolejki
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
await this.Queue.EnqueueAsync(txn, 40, cancellationToken);
await txn.CommitAsync();
}
Załóżmy, że zadania zostały ukończone pomyślnie, uruchomione równolegle i że nie było żadnych innych współbieżnych transakcji modyfikujących kolejkę. Nie można wnioskować o kolejności elementów w kolejce. W tym fragmencie kodu elementy mogą być wyświetlane w dowolnej z 4! możliwe zamówienia. Kolejka spróbuje zachować elementy w oryginalnej kolejności (w kolejce), ale może być zmuszona do zmiany kolejności z powodu współbieżnych operacji lub błędów.
DequeueAsync
Poniżej przedstawiono kilka fragmentów kodu do używania narzędzia TryDequeueAsync, a następnie oczekiwane dane wyjściowe. Załóżmy, że kolejka jest już wypełniona następującymi elementami w kolejce:
10, 20, 30, 40, 50, 60
- Przypadek 1. Zadanie z jedną kolejką
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await txn.CommitAsync();
}
Załóżmy, że zadanie zostało ukończone pomyślnie i że nie było współbieżnych transakcji modyfikujących kolejkę. Ponieważ nie można wnioskować o kolejności elementów w kolejce, wszystkie trzy elementy mogą być odsłonięty w kolejce, w dowolnej kolejności. Kolejka spróbuje zachować elementy w oryginalnej kolejności (w kolejce), ale może być zmuszona do zmiany kolejności z powodu współbieżnych operacji lub błędów.
- Przypadek 2. Równoległe zadanie dequeue
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
Załóżmy, że zadania zostały ukończone pomyślnie, uruchomione równolegle i że nie było żadnych innych współbieżnych transakcji modyfikujących kolejkę. Ponieważ nie można wnioskować o kolejności elementów w kolejce, listy dequeue1 i dequeue2 będą zawierać wszystkie dwa elementy w dowolnej kolejności.
Ten sam element nie będzie wyświetlany na obu listach. W związku z tym, jeśli dequeue1 ma 10, 30, to dequeue2 będzie miał 20, 40.
- Przypadek 3. Dequeue Ordering with Transaction Abort (Przypadek 3. Anulowanie zamawiania za pomocą transakcji przerwania)
Przerywanie transakcji z dequeues w locie umieszcza elementy z powrotem na głowie kolejki. Kolejność, w jakiej elementy są umieszczane z powrotem na głowie kolejki, nie jest gwarantowana. Przyjrzyjmy się następującej kodzie:
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
// Abort the transaction
await txn.AbortAsync();
}
Załóżmy, że elementy zostały w kolejce w następującej kolejności:
10, 20
Po przerwaniu transakcji elementy zostaną dodane z powrotem do nagłówka kolejki w dowolnej z następujących zamówień:
10, 20
20, 10
To samo dotyczy wszystkich przypadków, w których transakcja nie została pomyślnie zatwierdzona.
Wzorce programowania
W tej sekcji przyjrzyjmy się kilku wzorom programowania, które mogą być przydatne podczas korzystania z elementu ReliableConcurrentQueue.
Kolejki usługi Batch
Zalecanym wzorcem programowania jest wykonanie zadania odbiorcy wsadowego wsadowego w kolejce zamiast wykonywania jednego dequeue naraz. Użytkownik może wybrać ograniczenie opóźnień między każdą partią lub rozmiarem partii. Poniższy fragment kodu przedstawia ten model programowania. Należy pamiętać, że w tym przykładzie przetwarzanie odbywa się po zatwierdzeniu transakcji, więc jeśli podczas przetwarzania wystąpi błąd, nieprzetworzone elementy zostaną utracone bez przetworzenia. Alternatywnie przetwarzanie można wykonać w zakresie transakcji, jednak może mieć negatywny wpływ na wydajność i wymaga obsługi już przetworzonych elementów.
int batchSize = 5;
long delayMs = 100;
while(!cancellationToken.IsCancellationRequested)
{
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
for(int i = 0; i < batchSize; ++i)
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
else
{
// else break the for loop
break;
}
}
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
int delayFactor = batchSize - processItems.Count;
await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}
Przetwarzanie oparte na powiadomieniach o najlepszym wysiłku
Inny interesujący wzorzec programowania używa interfejsu API count. W tym miejscu możemy zaimplementować przetwarzanie oparte na powiadomieniach dla kolejki. Liczba kolejek może służyć do ograniczania kolejki lub dequeue zadania. Należy pamiętać, że tak jak w poprzednim przykładzie, ponieważ przetwarzanie odbywa się poza transakcją, nieprzetworzone elementy mogą zostać utracone w przypadku wystąpienia błędu podczas przetwarzania.
int threshold = 5;
long delayMs = 1000;
while(!cancellationToken.IsCancellationRequested)
{
while (this.Queue.Count < threshold)
{
cancellationToken.ThrowIfCancellationRequested();
// If the queue does not have the threshold number of items, delay the task and check again
await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
}
// If there are approximately threshold number of items, try and process the queue
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
} while (processItems.Count < threshold && ret.HasValue);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
}
Opróżnianie o najlepszym wysiłku
Nie można zagwarantować opróżnienia kolejki ze względu na współbieżny charakter struktury danych. Możliwe, że nawet jeśli żadne operacje użytkownika w kolejce nie są wykonywane w locie, określone wywołanie metody TryDequeueAsync może nie zwracać elementu, który został wcześniej w kolejce i zatwierdzony. Element w kolejce ma gwarancję , że ostatecznie stanie się widoczny, aby usunąć kolejkę, jednak bez mechanizmu komunikacji poza pasmem niezależny konsument nie może wiedzieć, że kolejka osiągnęła stały stan, nawet jeśli wszyscy producenci zostali zatrzymani, a żadne nowe operacje kolejkowania nie są dozwolone. W związku z tym operacja opróżniania jest najlepszym wysiłkiem, jak zaimplementowano poniżej.
Użytkownik powinien zatrzymać wszystkie dalsze zadania producenta i konsumenta i poczekać na wszelkie transakcje w locie, aby zatwierdzić lub przerwać, przed podjęciem próby opróżnienia kolejki. Jeśli użytkownik zna oczekiwaną liczbę elementów w kolejce, może skonfigurować powiadomienie, które sygnalizuje, że wszystkie elementy zostały w kolejce.
int numItemsDequeued;
int batchSize = 5;
ConditionalValue ret;
do
{
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if(ret.HasValue)
{
// Buffer the dequeues
processItems.Add(ret.Value);
}
} while (ret.HasValue && processItems.Count < batchSize);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
} while (ret.HasValue);
Peek
Funkcja ReliableConcurrentQueue nie udostępnia interfejsu API TryPeekAsync . Użytkownicy mogą uzyskać semantyka podglądu przy użyciu narzędzia TryDequeueAsync , a następnie przerwania transakcji. W tym przykładzie kolejki są przetwarzane tylko wtedy, gdy wartość elementu jest większa niż 10.
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
bool valueProcessed = false;
if (ret.HasValue)
{
if (ret.Value > 10)
{
// Process the item
Console.WriteLine("Value : " + ret.Value);
valueProcessed = true;
}
}
if (valueProcessed)
{
await txn.CommitAsync();
}
else
{
await txn.AbortAsync();
}
}
Musi odczytywać
- Przewodnik Szybki start dotyczący usług Reliable Services
- Praca z elementami Reliable Collections
- Powiadomienia dotyczące usług Reliable Services
- Tworzenie i przywracanie kopii zapasowych usług Reliable Services (odzyskiwanie po awarii)
- Niezawodna konfiguracja programu State Manager
- Wprowadzenie do usług internetowego interfejsu API usługi Service Fabric
- Zaawansowane użycie modelu programowania usług Reliable Services
- Dokumentacja dla deweloperów dotycząca niezawodnych kolekcji