Udostępnij za pośrednictwem


Wprowadzenie do elementu ReliableConcurrentQueue w usłudze Azure Service Fabric

Reliable Concurrent Queue to asynchroniczna, transakcyjna i replikowana kolejka, która oferuje wysoką współbieżność dla operacji kolejkowania i dequeue. Została zaprojektowana w celu zapewnienia wysokiej przepływności i małych opóźnień dzięki złagodzeniu ścisłej kolejności FIFO zapewnianej przez niezawodną kolejkę , a zamiast tego zapewnia najlepsze zamówienie.

Interfejsy API

Kolejka współbieżna Niezawodna kolejka współbieżna
void Enqueue(element T) Zadanie EnqueueAsync(ITransaction tx, element T)
bool TryDequeue(wynik T) Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx)
liczba int() long Count()

Porównanie z niezawodną kolejką

Niezawodna kolejka współbieżna jest oferowana jako alternatywa dla niezawodnej kolejki. Należy go używać w przypadkach, gdy ścisłe porządkowanie FIFO nie jest wymagane, ponieważ zagwarantowanie fiFO wymaga kompromisu z współbieżnością. Funkcja Reliable Queue używa blokad w celu wymuszania porządkowania FIFO, przy użyciu co najwyżej jednej transakcji dozwolonej do kolejkowania i co najwyżej jednej transakcji dozwolonej do dequeue naraz. Dla porównania funkcja Reliable Concurrent Queue zwalnia ograniczenie kolejności i umożliwia wykonywanie dowolnej liczby współbieżnych transakcji w celu przeplatania ich operacji kolejkowania i dequeue. Podane jest porządkowanie najwygodniejsze, jednak względne porządkowanie dwóch wartości w niezawodnej kolejce współbieżnej nigdy nie może być gwarantowane.

Niezawodna kolejka współbieżna zapewnia większą przepływność i mniejsze opóźnienie niż niezawodna kolejka , gdy istnieje wiele współbieżnych transakcji wykonujących kolejki i/lub dequeues.

Przykładowy przypadek użycia metody ReliableConcurrentQueue to scenariusz kolejki komunikatów . W tym scenariuszu co najmniej jeden producent komunikatów tworzy i dodaje elementy do kolejki, a co najmniej jeden odbiorca komunikatów ściąga komunikaty z kolejki i przetwarza je. Wielu producentów i konsumentów może pracować niezależnie, używając współbieżnych transakcji w celu przetworzenia kolejki.

Zalecenia dotyczące użytkowania

  • Kolejka oczekuje, że elementy w kolejce mają niski okres przechowywania. Oznacza to, że elementy nie pozostaną w kolejce przez długi czas.
  • Kolejka nie gwarantuje ścisłej kolejności FIFO.
  • Kolejka nie odczytuje własnych zapisów. Jeśli element jest w kolejce w ramach transakcji, nie będzie widoczny dla dequeuer w ramach tej samej transakcji.
  • Kolejki nie są odizolowane od siebie. Jeśli element A jest zdequeued w txnA transakcji, mimo że txnA nie jest zatwierdzony, element A nie będzie widoczny dla równoczesnej transakcji txnB. Jeśli txnA przerywa, A stanie się widoczny dla txnB natychmiast.
  • Zachowanie tryPeekAsync można zaimplementować przy użyciu narzędzia TryDequeueAsync , a następnie przerwania transakcji. Przykład tego zachowania można znaleźć w sekcji Wzorce programowania.
  • Liczba nie jest transakcyjna. Może służyć do uzyskania pojęcia liczby elementów w kolejce, ale reprezentuje punkt w czasie i nie można go polegać.
  • Kosztowne przetwarzanie elementów w kolejce nie powinno być wykonywane, gdy transakcja jest aktywna, aby uniknąć długotrwałych transakcji, które mogą mieć wpływ na wydajność systemu.

Wstawki kodu

Przyjrzyjmy się kilku fragmentom kodu i ich oczekiwanym wynikom. Obsługa wyjątków jest ignorowana w tej sekcji.

Wystąpienia

Tworzenie wystąpienia niezawodnej kolejki współbieżnej jest podobne do każdej innej niezawodnej kolekcji.

IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");

EnqueueAsync

Poniżej przedstawiono kilka fragmentów kodu do używania metody EnqueueAsync, a następnie ich oczekiwane dane wyjściowe.

  • Przypadek 1. Jedno zadanie w kolejce
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

Załóżmy, że zadanie zostało ukończone pomyślnie i że nie było współbieżnych transakcji modyfikujących kolejkę. Użytkownik może oczekiwać, że kolejka będzie zawierać elementy w dowolnej z następujących kolejności:

10, 20

20, 10

  • Przypadek 2. Równoległe zadanie kolejki
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 40, cancellationToken);

    await txn.CommitAsync();
}

Załóżmy, że zadania zostały ukończone pomyślnie, uruchomione równolegle i że nie było żadnych innych współbieżnych transakcji modyfikujących kolejkę. Nie można wnioskować o kolejności elementów w kolejce. W tym fragmencie kodu elementy mogą być wyświetlane w dowolnej z 4! możliwe zamówienia. Kolejka spróbuje zachować elementy w oryginalnej kolejności (w kolejce), ale może być zmuszona do zmiany kolejności z powodu współbieżnych operacji lub błędów.

DequeueAsync

Poniżej przedstawiono kilka fragmentów kodu do używania narzędzia TryDequeueAsync, a następnie oczekiwane dane wyjściowe. Załóżmy, że kolejka jest już wypełniona następującymi elementami w kolejce:

10, 20, 30, 40, 50, 60

  • Przypadek 1. Zadanie z jedną kolejką
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    await txn.CommitAsync();
}

Załóżmy, że zadanie zostało ukończone pomyślnie i że nie było współbieżnych transakcji modyfikujących kolejkę. Ponieważ nie można wnioskować o kolejności elementów w kolejce, wszystkie trzy elementy mogą być odsłonięty w kolejce, w dowolnej kolejności. Kolejka spróbuje zachować elementy w oryginalnej kolejności (w kolejce), ale może być zmuszona do zmiany kolejności z powodu współbieżnych operacji lub błędów.

  • Przypadek 2. Równoległe zadanie dequeue
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

Załóżmy, że zadania zostały ukończone pomyślnie, uruchomione równolegle i że nie było żadnych innych współbieżnych transakcji modyfikujących kolejkę. Ponieważ nie można wnioskować o kolejności elementów w kolejce, listy dequeue1 i dequeue2 będą zawierać wszystkie dwa elementy w dowolnej kolejności.

Ten sam element nie będzie wyświetlany na obu listach. W związku z tym, jeśli dequeue1 ma 10, 30, to dequeue2 będzie miał 20, 40.

  • Przypadek 3. Dequeue Ordering with Transaction Abort (Przypadek 3. Anulowanie zamawiania za pomocą transakcji przerwania)

Przerywanie transakcji z dequeues w locie umieszcza elementy z powrotem na głowie kolejki. Kolejność, w jakiej elementy są umieszczane z powrotem na głowie kolejki, nie jest gwarantowana. Przyjrzyjmy się następującej kodzie:

using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    // Abort the transaction
    await txn.AbortAsync();
}

Załóżmy, że elementy zostały w kolejce w następującej kolejności:

10, 20

Po przerwaniu transakcji elementy zostaną dodane z powrotem do nagłówka kolejki w dowolnej z następujących zamówień:

10, 20

20, 10

To samo dotyczy wszystkich przypadków, w których transakcja nie została pomyślnie zatwierdzona.

Wzorce programowania

W tej sekcji przyjrzyjmy się kilku wzorom programowania, które mogą być przydatne podczas korzystania z elementu ReliableConcurrentQueue.

Kolejki usługi Batch

Zalecanym wzorcem programowania jest wykonanie zadania odbiorcy wsadowego wsadowego w kolejce zamiast wykonywania jednego dequeue naraz. Użytkownik może wybrać ograniczenie opóźnień między każdą partią lub rozmiarem partii. Poniższy fragment kodu przedstawia ten model programowania. Należy pamiętać, że w tym przykładzie przetwarzanie odbywa się po zatwierdzeniu transakcji, więc jeśli podczas przetwarzania wystąpi błąd, nieprzetworzone elementy zostaną utracone bez przetworzenia. Alternatywnie przetwarzanie można wykonać w zakresie transakcji, jednak może mieć negatywny wpływ na wydajność i wymaga obsługi już przetworzonych elementów.

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

Przetwarzanie oparte na powiadomieniach o najlepszym wysiłku

Inny interesujący wzorzec programowania używa interfejsu API count. W tym miejscu możemy zaimplementować przetwarzanie oparte na powiadomieniach dla kolejki. Liczba kolejek może służyć do ograniczania kolejki lub dequeue zadania. Należy pamiętać, że tak jak w poprzednim przykładzie, ponieważ przetwarzanie odbywa się poza transakcją, nieprzetworzone elementy mogą zostać utracone w przypadku wystąpienia błędu podczas przetwarzania.

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

Opróżnianie o najlepszym wysiłku

Nie można zagwarantować opróżnienia kolejki ze względu na współbieżny charakter struktury danych. Możliwe, że nawet jeśli żadne operacje użytkownika w kolejce nie są wykonywane w locie, określone wywołanie metody TryDequeueAsync może nie zwracać elementu, który został wcześniej w kolejce i zatwierdzony. Element w kolejce ma gwarancję , że ostatecznie stanie się widoczny, aby usunąć kolejkę, jednak bez mechanizmu komunikacji poza pasmem niezależny konsument nie może wiedzieć, że kolejka osiągnęła stały stan, nawet jeśli wszyscy producenci zostali zatrzymani, a żadne nowe operacje kolejkowania nie są dozwolone. W związku z tym operacja opróżniania jest najlepszym wysiłkiem, jak zaimplementowano poniżej.

Użytkownik powinien zatrzymać wszystkie dalsze zadania producenta i konsumenta i poczekać na wszelkie transakcje w locie, aby zatwierdzić lub przerwać, przed podjęciem próby opróżnienia kolejki. Jeśli użytkownik zna oczekiwaną liczbę elementów w kolejce, może skonfigurować powiadomienie, które sygnalizuje, że wszystkie elementy zostały w kolejce.

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

Peek

Funkcja ReliableConcurrentQueue nie udostępnia interfejsu API TryPeekAsync . Użytkownicy mogą uzyskać semantyka podglądu przy użyciu narzędzia TryDequeueAsync , a następnie przerwania transakcji. W tym przykładzie kolejki są przetwarzane tylko wtedy, gdy wartość elementu jest większa niż 10.

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

Musi odczytywać