Sdílet prostřednictvím


Úvod do ReliableConcurrentQueue v Azure Service Fabric

Spolehlivá souběžná fronta je asynchronní, transakční a replikovaná fronta, která má vysokou souběžnost pro operace zařazení do fronty a dequeue. Je navržená tak, aby poskytovala vysokou propustnost a nízkou latenci uvolněním striktního řazení FIFO poskytovaného reliable queue a místo toho poskytuje řazení co nejlepšího úsilí.

Rozhraní API

Souběžná fronta Spolehlivá souběžná fronta
void Enqueue(T item) Task EnqueueAsync(ITransaction tx, T item)
bool TryDequeue(výsledek T) Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx)
int Count() long Count()

Porovnání se spolehlivou frontou

Spolehlivá souběžná fronta se nabízí jako alternativa ke spolehlivé frontě. Měla by se používat v případech, kdy není vyžadováno striktní řazení FIFO, protože záruka FIFO vyžaduje kompromis s souběžností. Spolehlivá fronta používá zámky k vynucení řazení FIFO, přičemž nejméně jedna transakce může vytvořit frontu a maximálně jednu transakci povoleno vyřadit najednou. Naproti tomu Spolehlivá souběžná fronta uvolní omezení řazení a umožňuje libovolné počet souběžných transakcí prokládání jejich enqueue a dequeue operace. Řazení nejlepších úsilí je k dispozici, ale relativní řazení dvou hodnot ve spolehlivé souběžné frontě nemůže být nikdy zaručeno.

Spolehlivá souběžná fronta poskytuje vyšší propustnost a nižší latenci než Spolehlivá fronta , kdykoli existuje více souběžných transakcí provádějících výčet a/nebo vyřazení z fronty.

Ukázkový případ použití pro ReliableConcurrentQueue je scénář fronty zpráv. V tomto scénáři jeden nebo více výrobců zpráv vytváří a přidává položky do fronty a jeden nebo více příjemců zpráv načítá zprávy z fronty a zpracovává je. Více výrobců a příjemců může pracovat nezávisle na sobě pomocí souběžných transakcí, aby bylo možné frontu zpracovat.

Pokyny k používání

  • Fronta očekává, že položky ve frontě mají nízkou dobu uchovávání. To znamená, že položky by dlouho nezůstaly ve frontě.
  • Fronta nezaručuje striktní řazení FIFO.
  • Fronta nečte vlastní zápisy. Pokud je položka zapsána do fronty v rámci transakce, nebude viditelná pro dequeuer v rámci stejné transakce.
  • Dequeues nejsou navzájem izolované. Pokud je položka A vyřazena z fronty v transakci txnA, i když txnA není potvrzena, položka A by nebyla viditelná pro souběžné transakce txnB. Pokud txnA přeruší, A se okamžitě zobrazí txnB.
  • Chování TryPeekAsync lze implementovat pomocí TryDequeueAsync a potom přerušit transakci. Příklad tohoto chování najdete v části Programovací vzory.
  • Počet není transakční. Dá se použít k získání představu o počtu prvků ve frontě, ale představuje bod v čase a nelze ho spoléhat.
  • Nákladné zpracování u odstraněných položek by nemělo být provedeno, když je transakce aktivní, aby se zabránilo dlouhotrvajícím transakcím, které mohou mít dopad na výkon systému.

Fragmenty kódu

Podívejme se na několik fragmentů kódu a jejich očekávaných výstupů. Zpracování výjimek se v této části ignoruje.

Doložení příkladem

Vytvoření instance spolehlivé souběžné fronty se podobá jakékoli jiné spolehlivé kolekci.

IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");

EnqueueAsync

Tady je několik fragmentů kódu pro použití enqueueAsync následovaných očekávanými výstupy.

  • Případ 1: Single Enqueue Task
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

Předpokládejme, že úloha byla úspěšně dokončena a že nedošlo k žádné souběžné transakci, které by upravovaly frontu. Uživatel může očekávat, že fronta bude obsahovat položky v některé z následujících objednávek:

10, 20

20, 10

  • Případ 2: Paralelní úloha enqueue
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 20, cancellationToken);

    await txn.CommitAsync();
}

// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
    await this.Queue.EnqueueAsync(txn, 40, cancellationToken);

    await txn.CommitAsync();
}

Předpokládejme, že úkoly byly úspěšně dokončeny, že úlohy běžely paralelně a že nebyly žádné další souběžné transakce, které upravují frontu. Pořadí položek ve frontě nelze odvozovat. Pro tento fragment kódu se položky můžou objevit v některém ze 4! možných objednávek. Fronta se pokusí zachovat položky v původním pořadí (ve frontě), ale může být nucena změnit jejich pořadí kvůli souběžným operacím nebo chybám.

DequeueAsync

Tady je několik fragmentů kódu pro použití tryDequeueAsync následovaných očekávanými výstupy. Předpokládejme, že fronta je již naplněna následujícími položkami ve frontě:

10, 20, 30, 40, 50, 60

  • Případ 1: Single Dequeue – úloha
using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    await txn.CommitAsync();
}

Předpokládejme, že úloha byla úspěšně dokončena a že nedošlo k žádné souběžné transakci, které by upravovaly frontu. Vzhledem k tomu, že nelze odvozovat pořadí položek ve frontě, mohou být všechny tři položky v libovolném pořadí vyřazeny z fronty. Fronta se pokusí zachovat položky v původním pořadí (ve frontě), ale může být nucena změnit jejich pořadí kvůli souběžným operacím nebo chybám.

  • Případ 2: Paralelní dequeue – úloha
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
    dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;

    await txn.CommitAsync();
}

Předpokládejme, že úkoly byly úspěšně dokončeny, že úlohy běžely paralelně a že nebyly žádné další souběžné transakce, které upravují frontu. Vzhledem k tomu, že nelze odvozovat pořadí položek ve frontě, seznamy dequeue1 a dequeue2 budou obsahovat všechny dvě položky v libovolném pořadí.

Stejná položka se nezobrazí v obou seznamech. Proto pokud má dequeue1 10, 30, pak dequeue2 by měl 20, 40.

  • Případ 3: Zrušení objednávky s přerušením transakce

Přerušení transakce s vyřazením z letu umístí položky zpět do hlavy fronty. Pořadí, ve kterém jsou položky vráceny do hlavy fronty, není zaručeno. Podívejme se na následující kód:

using (var txn = this.StateManager.CreateTransaction())
{
    await this.Queue.TryDequeueAsync(txn, cancellationToken);
    await this.Queue.TryDequeueAsync(txn, cancellationToken);

    // Abort the transaction
    await txn.AbortAsync();
}

Předpokládejme, že položky byly vyřazeny z fronty v následujícím pořadí:

10, 20

Když transakci přerušíme, položky by se přidaly zpět do hlavy fronty v některé z následujících objednávek:

10, 20

20, 10

Totéž platí pro všechny případy, kdy transakce nebyla úspěšně potvrzena.

Programovací vzory

V této části se podíváme na několik programovacích vzorů, které můžou být užitečné při používání ReliableConcurrentQueue.

Dávkové dequeues

Doporučeným programovacím vzorem je, aby úloha příjemce zasála své dequeuy místo provedení jednoho dequeue najednou. Uživatel se může rozhodnout, že omezí zpoždění mezi každou dávkou nebo velikostí dávky. Následující fragment kódu ukazuje tento programovací model. Mějte na paměti, že v tomto příkladu se zpracování provádí po potvrzení transakce, takže pokud při zpracování došlo k chybě, nezpracované položky budou ztraceny bez zpracování. Případně lze zpracování provést v rámci rozsahu transakce, ale může mít negativní dopad na výkon a vyžaduje zpracování položek, které jsou již zpracovány.

int batchSize = 5;
long delayMs = 100;

while(!cancellationToken.IsCancellationRequested)
{
    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        for(int i = 0; i < batchSize; ++i)
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
            else
            {
                // else break the for loop
                break;
            }
        }

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }

    int delayFactor = batchSize - processItems.Count;
    await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}

Zpracování na základě oznámení na základě nejlepšího úsilí

Dalším zajímavým programovacím vzorem je rozhraní API pro počet. Tady můžeme implementovat zpracování na základě oznámení na základě maximálního úsilí pro frontu. Počet front lze použít k omezení fronty nebo úlohy dequeue. Všimněte si, že stejně jako v předchozím příkladu, protože zpracování probíhá mimo transakci, mohou být nezpracované položky ztraceny, pokud během zpracování dojde k chybě.

int threshold = 5;
long delayMs = 1000;

while(!cancellationToken.IsCancellationRequested)
{
    while (this.Queue.Count < threshold)
    {
        cancellationToken.ThrowIfCancellationRequested();

        // If the queue does not have the threshold number of items, delay the task and check again
        await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
    }

    // If there are approximately threshold number of items, try and process the queue

    // Buffer for dequeued items
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        ConditionalValue<int> ret;

        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if (ret.HasValue)
            {
                // If an item was dequeued, add to the buffer for processing
                processItems.Add(ret.Value);
            }
        } while (processItems.Count < threshold && ret.HasValue);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
}

Vyprázdnění v nejlepším úsilí

Vyprázdnění fronty nelze zaručit kvůli souběžné povaze datové struktury. Je možné, že i když ve frontě nejsou žádné uživatelské operace, konkrétní volání TryDequeueAsync nemusí vrátit položku, která byla dříve zařazena do fronty a potvrzena. Zařazení položky je zaručeno, že se nakonec stane viditelným pro vyřazení z fronty, ale bez vzdáleného komunikačního mechanismu nemůže nezávislý spotřebitel vědět, že fronta dosáhla stabilního stavu, i když byly zastaveny všechny producenty a nejsou povoleny žádné nové operace fronty. Proto je operace vyprázdnění nejlepší, jak je implementováno níže.

Uživatel by měl zastavit všechny další úlohy producenta a příjemce a počkat, až se všechny transakce v letu potvrdí nebo přeruší, a teprve potom se pokusí vyprázdnit frontu. Pokud uživatel ví očekávaný počet položek ve frontě, může nastavit oznámení, že všechny položky byly vyřazeny z fronty.

int numItemsDequeued;
int batchSize = 5;

ConditionalValue ret;

do
{
    List<int> processItems = new List<int>();

    using (var txn = this.StateManager.CreateTransaction())
    {
        do
        {
            ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);

            if(ret.HasValue)
            {
                // Buffer the dequeues
                processItems.Add(ret.Value);
            }
        } while (ret.HasValue && processItems.Count < batchSize);

        await txn.CommitAsync();
    }

    // Process the dequeues
    for (int i = 0; i < processItems.Count; ++i)
    {
        Console.WriteLine("Value : " + processItems[i]);
    }
} while (ret.HasValue);

Prohlížet

ReliableConcurrentQueue neposkytuje rozhraní Api TryPeekAsync . Uživatelé mohou získat náhled sémantiky pomocí TryDequeueAsync a pak přerušit transakci. V tomto příkladu se dequeus zpracovávají pouze v případě, že je hodnota položky větší než 10.

using (var txn = this.StateManager.CreateTransaction())
{
    ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
    bool valueProcessed = false;

    if (ret.HasValue)
    {
        if (ret.Value > 10)
        {
            // Process the item
            Console.WriteLine("Value : " + ret.Value);
            valueProcessed = true;
        }
    }

    if (valueProcessed)
    {
        await txn.CommitAsync();    
    }
    else
    {
        await txn.AbortAsync();
    }
}

Musí číst