Introduktion till ReliableConcurrentQueue i Azure Service Fabric
Reliable Concurrent Queue är en asynkron, transaktionell och replikerad kö som har hög samtidighet för queue- och dequeue-åtgärder. Den är utformad för att leverera högt dataflöde och låg svarstid genom att lätta på den strikta FIFO-beställningen som tillhandahålls av Reliable Queue och ger istället en beställning med bästa möjliga arbete.
API:er
Samtidig kö | Reliable Concurrent Queue |
---|---|
void Enqueue(T item) | Uppgift EnqueueAsync(ITransaction tx, T-objekt) |
bool TryDequeue(ut T-resultat) | Task< ConditionalValue < T >> TryDequeueAsync(ITransaction tx) |
int Count() | long Count() |
Jämförelse med Reliable Queue
Reliable Concurrent Queue erbjuds som ett alternativ till Reliable Queue. Det bör användas i fall där strikt FIFO-beställning inte krävs, eftersom garanti av FIFO kräver en kompromiss med samtidighet. Reliable Queue använder lås för att framtvinga FIFO-beställning, med högst en transaktion tillåten att köa och högst en transaktion tillåts att dequeue åt gången. Som jämförelse kopplar Reliable Concurrent Queue av ordningsbegränsningen och gör det möjligt för valfritt antal samtidiga transaktioner att interleave sina queue- och dequeue-åtgärder. Beställning med bästa förmåga tillhandahålls, men den relativa ordningen på två värden i en tillförlitlig samtidig kö kan aldrig garanteras.
Reliable Concurrent Queue ger högre dataflöde och lägre svarstid än Reliable Queue när det finns flera samtidiga transaktioner som utför köer och/eller dequeues.
Ett exempel på användningsfall för ReliableConcurrentQueue är scenariot meddelandekö . I det här scenariot skapar och lägger en eller flera meddelandeproducenter till i kön, och en eller flera meddelandekonsumenter hämtar meddelanden från kön och bearbetar dem. Flera producenter och konsumenter kan arbeta oberoende av varandra och använda samtidiga transaktioner för att bearbeta kön.
Användningsriktlinjer
- Kön förväntar sig att objekten i kön har en låg kvarhållningsperiod. Det vill sa att objekten inte skulle stanna i kön under en längre tid.
- Kön garanterar inte strikt FIFO-beställning.
- Kön läser inte sina egna skrivningar. Om ett objekt lagras i en transaktion visas det inte för en dequeuer i samma transaktion.
- Dequeues är inte isolerade från varandra. Om objekt A tas bort i txnA-transaktion, även om txnA inte har checkats in, skulle objekt A inte vara synligt för en samtidig transaktions-txnB. Om txnA avbryts blir A synligt för txnB omedelbart.
- TryPeekAsync-beteende kan implementeras med hjälp av en TryDequeueAsync och sedan avbryta transaktionen. Ett exempel på det här beteendet finns i avsnittet Programmeringsmönster.
- Antalet är icke-transaktionellt. Det kan användas för att få en uppfattning om antalet element i kön, men representerar en tidpunkt och kan inte lita på.
- Dyr bearbetning av de borttagna objekten bör inte utföras medan transaktionen är aktiv, för att undvika långvariga transaktioner som kan ha en prestandapåverkan på systemet.
Kodfragment
Låt oss titta på några kodfragment och deras förväntade utdata. Undantagshantering ignoreras i det här avsnittet.
Instansiering
Att skapa en instans av en tillförlitlig samtidig kö liknar alla andra tillförlitliga samlingar.
IReliableConcurrentQueue<int> queue = await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<int>>("myQueue");
EnqueueAsync
Här följer några kodfragment för att använda EnqueueAsync följt av deras förväntade utdata.
- Fall 1: Enqueue-uppgift
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
Anta att uppgiften har slutförts och att det inte fanns några samtidiga transaktioner som ändrar kön. Användaren kan förvänta sig att kön innehåller objekten i någon av följande beställningar:
10, 20
20, 10
- Fall 2: Parallell köaktivitet
// Parallel Task 1
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 10, cancellationToken);
await this.Queue.EnqueueAsync(txn, 20, cancellationToken);
await txn.CommitAsync();
}
// Parallel Task 2
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.EnqueueAsync(txn, 30, cancellationToken);
await this.Queue.EnqueueAsync(txn, 40, cancellationToken);
await txn.CommitAsync();
}
Anta att uppgifterna har slutförts, att aktiviteterna kördes parallellt och att det inte fanns några andra samtidiga transaktioner som ändrar kön. Det går inte att dra någon slutsats om ordningen på objekten i kön. För det här kodfragmentet kan objekten visas i någon av 4! möjliga orderings. Kön försöker behålla objekten i den ursprungliga ordningen (enqueued), men kan tvingas ändra ordning på dem på grund av samtidiga åtgärder eller fel.
DequeueAsync
Här följer några kodfragment för att använda TryDequeueAsync följt av förväntade utdata. Anta att kön redan är ifylld med följande objekt i kön:
10, 20, 30, 40, 50, 60
- Fall 1: Enskild dequeue-uppgift
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await txn.CommitAsync();
}
Anta att uppgiften har slutförts och att det inte fanns några samtidiga transaktioner som ändrar kön. Eftersom ingen slutsatsdragning kan göras om ordningen på objekten i kön, kan tre av objekten dequeued, i valfri ordning. Kön försöker behålla objekten i den ursprungliga ordningen (enqueued), men kan tvingas ändra ordning på dem på grund av samtidiga åtgärder eller fel.
- Fall 2: Parallell dequeue-uppgift
// Parallel Task 1
List<int> dequeue1;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue1.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
// Parallel Task 2
List<int> dequeue2;
using (var txn = this.StateManager.CreateTransaction())
{
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
dequeue2.Add(await this.Queue.TryDequeueAsync(txn, cancellationToken)).val;
await txn.CommitAsync();
}
Anta att uppgifterna har slutförts, att aktiviteterna kördes parallellt och att det inte fanns några andra samtidiga transaktioner som ändrar kön. Eftersom ingen slutsatsdragning kan göras om ordningen på objekten i kön, innehåller listorna dequeue1 och dequeue2 var och en två objekt, i valfri ordning.
Samma objekt visas inte i båda listorna. Därför, om dequeue1 har 10, 30, skulle dequeue2 ha 20, 40.
- Fall 3: Avqueue-beställning med transaktions abort
Om du avbryter en transaktion med dequeues under flygning hamnar objekten på köns huvud igen. Ordningen i vilken objekten sätts tillbaka på köns huvud garanteras inte. Låt oss titta på följande kod:
using (var txn = this.StateManager.CreateTransaction())
{
await this.Queue.TryDequeueAsync(txn, cancellationToken);
await this.Queue.TryDequeueAsync(txn, cancellationToken);
// Abort the transaction
await txn.AbortAsync();
}
Anta att objekten har dequeued i följande ordning:
10, 20
När vi avbryter transaktionen läggs objekten tillbaka till köns huvud i någon av följande beställningar:
10, 20
20, 10
Detsamma gäller för alla fall där transaktionen inte har checkats in.
Programmeringsmönster
I det här avsnittet ska vi titta på några programmeringsmönster som kan vara till hjälp vid användning av ReliableConcurrentQueue.
Batch-dequeues
Ett rekommenderat programmeringsmönster är att konsumentuppgiften batchar sina dequeues i stället för att utföra en dequeue i taget. Användaren kan välja att begränsa fördröjningar mellan varje batch eller batchstorlek. Följande kodfragment visar den här programmeringsmodellen. Tänk på att bearbetningen i det här exemplet utförs efter att transaktionen har checkats in, så om ett fel skulle inträffa under bearbetningen går de obearbetade objekten förlorade utan att ha bearbetats. Alternativt kan bearbetningen utföras inom transaktionens omfång, men det kan ha en negativ inverkan på prestanda och kräver hantering av de objekt som redan har bearbetats.
int batchSize = 5;
long delayMs = 100;
while(!cancellationToken.IsCancellationRequested)
{
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
for(int i = 0; i < batchSize; ++i)
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
else
{
// else break the for loop
break;
}
}
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
int delayFactor = batchSize - processItems.Count;
await Task.Delay(TimeSpan.FromMilliseconds(delayMs * delayFactor), cancellationToken);
}
Meddelandebaserad bearbetning med bästa förmåga
Ett annat intressant programmeringsmönster använder count-API:et. Här kan vi implementera meddelandebaserad bearbetning med bästa möjliga arbete för kön. Köantalet kan användas för att begränsa en kö eller en dequeue-uppgift. Observera att som i föregående exempel, eftersom bearbetningen sker utanför transaktionen, kan obearbetade objekt gå förlorade om ett fel inträffar under bearbetningen.
int threshold = 5;
long delayMs = 1000;
while(!cancellationToken.IsCancellationRequested)
{
while (this.Queue.Count < threshold)
{
cancellationToken.ThrowIfCancellationRequested();
// If the queue does not have the threshold number of items, delay the task and check again
await Task.Delay(TimeSpan.FromMilliseconds(delayMs), cancellationToken);
}
// If there are approximately threshold number of items, try and process the queue
// Buffer for dequeued items
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue<int> ret;
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if (ret.HasValue)
{
// If an item was dequeued, add to the buffer for processing
processItems.Add(ret.Value);
}
} while (processItems.Count < threshold && ret.HasValue);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
}
Best-Effort Drain
Det går inte att garantera att kön töms på grund av datastrukturens samtidiga karaktär. Det är möjligt att ett visst anrop till TryDequeueAsync, även om inga användaråtgärder i kön är under flygning, kanske inte returnerar ett objekt som tidigare har sparats och checkats in. Den köade posten är garanterad att så småningom bli synlig för borttagning, men utan en out-of-band-kommunikationsmekanism kan en oberoende konsument inte veta att kön har nått ett stabilt tillstånd även om alla producenter har stoppats och inga nya köåtgärder tillåts. Därför är dräneringsåtgärden bästa möjliga genom att implementeras nedan.
Användaren bör stoppa alla ytterligare producent- och konsumentuppgifter och vänta tills alla transaktioner under flygning har checkat in eller avbrutits innan de försöker tömma kön. Om användaren känner till det förväntade antalet objekt i kön kan de ställa in ett meddelande som signalerar att alla objekt har tagits bort.
int numItemsDequeued;
int batchSize = 5;
ConditionalValue ret;
do
{
List<int> processItems = new List<int>();
using (var txn = this.StateManager.CreateTransaction())
{
do
{
ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
if(ret.HasValue)
{
// Buffer the dequeues
processItems.Add(ret.Value);
}
} while (ret.HasValue && processItems.Count < batchSize);
await txn.CommitAsync();
}
// Process the dequeues
for (int i = 0; i < processItems.Count; ++i)
{
Console.WriteLine("Value : " + processItems[i]);
}
} while (ret.HasValue);
Granska
ReliableConcurrentQueue tillhandahåller inte Api:et TryPeekAsync . Användare kan få en titt på semantiken med hjälp av en TryDequeueAsync och sedan avbryta transaktionen. I det här exemplet bearbetas dequeues endast om objektets värde är större än 10.
using (var txn = this.StateManager.CreateTransaction())
{
ConditionalValue ret = await this.Queue.TryDequeueAsync(txn, cancellationToken);
bool valueProcessed = false;
if (ret.HasValue)
{
if (ret.Value > 10)
{
// Process the item
Console.WriteLine("Value : " + ret.Value);
valueProcessed = true;
}
}
if (valueProcessed)
{
await txn.CommitAsync();
}
else
{
await txn.AbortAsync();
}
}
Måste läsas
- Snabbstart för Reliable Services
- Arbeta med Reliable Collections
- Reliable Services-meddelanden
- Säkerhetskopiering och återställning av Reliable Services (haveriberedskap)
- Reliable State Manager-konfiguration
- Komma igång med Service Fabric Web API Services
- Avancerad användning av reliable services-programmeringsmodellen
- Utvecklarreferens för tillförlitliga samlingar