Ändringsflödesprocessorn i Azure Cosmos DB
GÄLLER FÖR: NoSQL
Ändringsflödesprocessorn är en del av Azure Cosmos DB .NET V3 och Java V4 SDK:er. Det förenklar processen för att läsa ändringsflödet och distribuerar händelsebearbetningen över flera konsumenter effektivt.
Den största fördelen med att använda ändringsflödesprocessorn är dess feltoleranta design, vilket garanterar en "minst en gång"-leverans av alla händelser i ändringsflödet.
SDK:er som stöds
.Net V3 | Java | Node.JS | Python |
---|---|---|---|
✓ | ✓ | ✕ | ✕ |
Komponenter i ändringsflödesprocessorn
Ändringsflödesprocessorn har fyra huvudkomponenter:
Den övervakade containern: Den övervakade containern har de data som ändringsflödet genereras från. Infogningar och uppdateringar till den övervakade containern visas i containerns ändringsflöde.
Lånecontainern: Lånecontainern fungerar som tillståndslagring och samordnar bearbetningen av ändringsflödet mellan flera arbetare. Lånecontainern kan lagras i samma konto som den övervakade containern eller i ett separat konto.
Beräkningsinstansen: En beräkningsinstans är värd för ändringsflödesprocessorn för att lyssna efter ändringar. Beroende på plattform kan den representeras av en virtuell dator (VM), en Kubernetes-podd, en Azure App Service-instans eller en verklig fysisk dator. Beräkningsinstansen har en unik identifierare som kallas instansnamnet i den här artikeln.
Ombudet: Ombudet är den kod som definierar vad du, utvecklaren, vill göra med varje batch med ändringar som ändringsflödesprocessorn läser.
För att ytterligare förstå hur dessa fyra element i ändringsflödesprocessorn fungerar tillsammans ska vi titta på ett exempel i följande diagram. Den övervakade containern lagrar objekt och använder "City" som partitionsnyckel. Partitionsnyckelvärdena distribueras i intervall (varje intervall representerar en fysisk partition) som innehåller objekt.
Diagrammet visar två beräkningsinstanser och ändringsflödesprocessorn tilldelar olika intervall till varje instans för att maximera beräkningsfördelningen. Varje instans har ett annat unikt namn.
Varje intervall läss parallellt. Ett intervalls förlopp underhålls separat från andra intervall i lånecontainern via ett lånedokument . Kombinationen av lånen representerar det aktuella tillståndet för ändringsflödesprocessorn.
Implementera ändringsflödesprocessorn
Ändringsflödesprocessorn i .NET är tillgänglig för senaste versionsläge och alla versioner och borttagningsläge. Alla versioner och borttagningsläge är i förhandsversion och stöds för ändringsflödesprocessorn som börjar i version 3.40.0-preview.0
. Startpunkten för båda lägena är alltid den övervakade containern.
Om du vill läsa med det senaste versionsläget anropar GetChangeFeedProcessorBuilder
du i en Container
instans :
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
Om du vill läsa med alla versioner och borttagningsläge anropar GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes
du från instansen Container
:
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
För båda lägena är den första parametern ett distinkt namn som beskriver målet med den här processorn. Det andra namnet är den delegatimplementering som hanterar ändringar.
Här är ett exempel på ett ombud för senaste versionsläge:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
Här är ett exempel på ett ombud för alla versioner och borttagningsläge:
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ChangeFeedItem<ToDoItem> item in changes)
{
if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
}
else
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
}
// Simulate work
await Task.Delay(1);
}
}
Efteråt definierar du namnet på beräkningsinstansen eller den unika identifieraren med hjälp WithInstanceName
av . Namnet på beräkningsinstansen ska vara unikt och olika för varje beräkningsinstans som du distribuerar. Du anger att containern ska behålla lånetillståndet med hjälp WithLeaseContainer
av .
Samtal Build
ger dig den processorinstans som du kan starta genom att anropa StartAsync
.
Kommentar
Föregående kodfragment tas från exempel i GitHub. Du kan hämta exemplet för det senaste versionsläget eller alla versioner och borttagningsläget.
Livscykel för bearbetning
Den normala livscykeln för en värdinstans är:
- Läs ändringsflödet.
- Om det inte finns några ändringar kan du viloläge under en fördefinierad tid (anpassningsbar med hjälp
WithPollInterval
av i Builder) och gå till #1. - Om det finns ändringar skickar du dem till ombudet.
- När ombudet har slutfört bearbetningen av ändringarna uppdaterar du lånearkivet med den senaste bearbetade tidpunkten och går till #1.
Felhantering
Ändringsflödesprocessorn är motståndskraftig mot fel i användarkod. Om delegatimplementeringen har ett ohanterat undantag (steg 4) stoppas tråden som bearbetar den specifika batchen med ändringar och en ny tråd skapas så småningom. Den nya tråden kontrollerar den senaste tidpunkten som lånearkivet har sparat för det intervallet med partitionsnyckelvärden. Den nya tråden startas om därifrån och skickar i praktiken samma batch med ändringar till ombudet. Det här beteendet fortsätter tills ombudet bearbetar ändringarna korrekt, och det är anledningen till att ändringsflödesprocessorn har en "minst en gång"-garanti.
Kommentar
I endast ett scenario görs inte en batch med ändringar på nytt. Om felet inträffar vid den första ombudskörningen någonsin har lånearkivet inget tidigare sparat tillstånd som ska användas vid återförsöket. I dessa fall använder återförsöket den inledande startkonfigurationen, som kanske eller kanske inte innehåller den sista batchen.
Om du vill förhindra att ändringsflödesprocessorn "fastnar" och kontinuerligt försöker utföra samma batch med ändringar igen bör du lägga till logik i ombudskoden för att skriva dokument, med undantag, till en kö med felmeddelanden. Den här designen säkerställer att du kan hålla reda på obearbetade ändringar samtidigt som du kan fortsätta att bearbeta framtida ändringar. Den felande meddelandekön kan vara en annan Azure Cosmos DB-container. Det exakta datalagret spelar ingen roll. Du vill bara att de obearbetade ändringarna ska sparas.
Du kan också använda ändringsflödesestimatorn för att övervaka förloppet för dina instanser av ändringsflödesprocessorn när de läser ändringsflödet, eller så kan du använda livscykelmeddelanden för att identifiera underliggande fel.
Livscykelmeddelanden
Du kan ansluta ändringsflödesprocessorn till alla relevanta händelser i livscykeln. Du kan välja att bli meddelad till en eller alla av dem. Rekommendationen är att minst registrera felmeddelandet:
- Registrera en hanterare för att meddelas när den aktuella värden skaffar ett lån för
WithLeaseAcquireNotification
att börja bearbeta det. - Registrera en hanterare för
WithLeaseReleaseNotification
att meddelas när den aktuella värden släpper ett lån och slutar bearbeta det. - Registrera en hanterare för
WithErrorNotification
att meddelas när den aktuella värden stöter på ett undantag under bearbetningen. Du måste kunna skilja på om källan är användardelegaten (ett ohanterat undantag) eller ett fel som processorn stöter på när den försöker komma åt den övervakade containern (till exempel nätverksproblem).
Livscykelmeddelanden är tillgängliga i båda ändringsflödeslägena. Här är ett exempel på livscykelmeddelanden i senaste versionsläge:
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Distributionsenhet
En distributionsenhet för en ändringsflödesprocessor består av en eller flera beräkningsinstanser som har samma värde för processorName
och samma lånecontainerkonfiguration, men olika instansnamn. Du kan ha många distributionsenheter där varje enhet har ett annat affärsflöde för ändringarna och varje distributionsenhet består av en eller flera instanser.
Du kan till exempel ha en distributionsenhet som utlöser ett externt API varje gång det sker en ändring i containern. En annan distributionsenhet kan flytta data i realtid varje gång en ändring sker. När en ändring sker i din övervakade container meddelas alla dina distributionsenheter.
Dynamisk skalning
Som tidigare nämnts kan du i en distributionsenhet ha en eller flera beräkningsinstanser. För att dra nytta av beräkningsdistributionen i distributionsenheten är de enda viktiga kraven att:
- Alla instanser ska ha samma lånecontainerkonfiguration.
- Alla instanser bör ha samma värde för
processorName
. - Varje instans måste ha ett unikt instansnamn (
WithInstanceName
).
Om dessa tre villkor gäller distribuerar ändringsflödesprocessorn alla lån som finns i lånecontainern över alla pågående instanser av distributionsenheten och parallelliserar beräkning med hjälp av en algoritm för lika fördelning. Ett lån ägs av en instans när som helst, så antalet instanser bör inte vara större än antalet lån.
Antalet instanser kan öka och minska. Ändringsflödesprocessorn justerar belastningen dynamiskt genom att omdistribuera den.
Dessutom kan ändringsflödesprocessorn dynamiskt justera en containers skala om containerns dataflöde eller lagring ökar. När containern växer hanterar ändringsflödesprocessorn scenariot transparent genom att dynamiskt öka lånen och distribuera de nya lånen mellan befintliga instanser.
Starttid
När en ändringsflödesprocessor startar för första gången initierar den som standard lånecontainern och startar bearbetningslivscykeln. Ändringar som har gjorts i den övervakade containern innan ändringsflödesprocessorn initieras för första gången identifieras inte.
Läsa från ett tidigare datum och en tidigare tid
Det är möjligt att initiera ändringsflödesprocessorn för att läsa ändringar som börjar vid ett visst datum och en viss tid genom att skicka en instans av DateTime
till WithStartTime
builder-tillägget:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
Ändringsflödesprocessorn initieras för det specifika datumet och tiden, och den börjar läsa ändringarna som skedde efteråt.
Läsa från början
I andra scenarier, till exempel i datamigreringar eller om du analyserar hela historiken för en container, måste du läsa ändringsflödet från början av containerns livslängd. Du kan använda WithStartTime
på builder-tillägget, men skicka DateTime.MinValue.ToUniversalTime()
, vilket genererar UTC-representationen av minimivärdet DateTime
som i det här exemplet:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
Ändringsflödesprocessorn initieras och börjar läsa ändringar från början av containerns livslängd.
Kommentar
De här anpassningsalternativen fungerar bara för att konfigurera startpunkten i tiden för ändringsflödesprocessorn. När lånecontainern har initierats för första gången har det ingen effekt att ändra de här alternativen.
Det går bara att anpassa startpunkten för senaste ändringsflödesläget. När du använder alla versioner och tar bort läge måste du börja läsa från den tidpunkt då processorn startas, eller återuppta från ett tidigare lånetillstånd som ligger inom kvarhållningsperioden för kontinuerlig säkerhetskopiering för ditt konto.
Ändra feed och etablerat dataflöde
Läsåtgärder för ändringsflöde på den övervakade containern förbrukar enheter för begäranden. Kontrollera att den övervakade containern inte har någon begränsning. Begränsning lägger till fördröjningar i mottagandet av ändringsflödeshändelser på dina processorer.
Åtgärder i lånecontainern (uppdatering och underhåll av tillstånd) förbrukar enheter för begäranden. Ju högre antal instanser som använder samma lånecontainer, desto högre är den potentiella förbrukningen av enheter för begäranden. Kontrollera att lånecontainern inte har någon begränsning. Begränsning lägger till fördröjningar i mottagandet av ändringsflödeshändelser. Begränsning kan till och med helt avsluta bearbetningen.
Dela lånecontainern
Du kan dela en lånecontainer i flera distributionsenheter. I en container med delat lån lyssnar varje distributionsenhet på en annan övervakad container eller har ett annat värde för processorName
. I den här konfigurationen upprätthåller varje distributionsenhet ett oberoende tillstånd för lånecontainern. Granska enhetsförbrukningen för begäran för en lånecontainer för att se till att det etablerade dataflödet räcker för alla distributionsenheter.
Avancerad lånekonfiguration
Tre nyckelkonfigurationer kan påverka hur ändringsflödesprocessorn fungerar. Varje konfiguration påverkar förbrukningen för begärandeenheten i lånecontainern. Du kan ange någon av dessa konfigurationer när du skapar ändringsflödesprocessorn, men använd dem noggrant:
- Låneförvärv: Som standard var 17:e sekund. En värd kontrollerar regelbundet tillståndet för lånearkivet och överväger att skaffa lån som en del av den dynamiska skalningsprocessen . Den här processen utförs genom att köra en fråga på lånecontainern. Om du minskar det här värdet går det snabbare att ombalansera och förvärva lån, men det ökar förbrukningen för begärandeenheter i lånecontainern.
- Förfallotid för lån: Som standard 60 sekunder. Definierar den maximala tid som ett lån kan finnas utan någon förnyelseaktivitet innan det förvärvas av en annan värd. När en värd kraschar hämtas lånen som den ägde av andra värdar efter den här tidsperioden plus det konfigurerade förnyelseintervallet. Om du minskar det här värdet går det snabbare att återställa efter en värdkrasch, men förfallovärdet bör aldrig vara lägre än förnyelseintervallet.
- Förnyelse av lån: Som standard var 13:e sekund. En värd som äger ett lån förnyar lånet regelbundet, även om det inte finns några nya ändringar att använda. Den här processen görs genom att köra en Replace på lånet. Om du minskar det här värdet minskar den tid som krävs för att identifiera lån som förlorats av en värd som kraschar, men det ökar förbrukningen för begärandeenhet på lånecontainern.
Var du ska vara värd för ändringsflödesprocessorn
Ändringsflödesprocessorn kan finnas på valfri plattform som stöder tidskrävande processer eller uppgifter. Nedan följer några exempel:
- En instans av WebJobs i Azure App Service som körs kontinuerligt
- En process i en instans av Azure Virtual Machines
- Ett bakgrundsjobb i Azure Kubernetes Service
- En serverlös funktion i Azure Functions
- En ASP.NET värdbaserad tjänst
Även om ändringsflödesprocessorn kan köras i kortvariga miljöer eftersom lånecontainern upprätthåller tillståndet, lägger startcykeln för dessa miljöer till fördröjningar i den tid det tar att ta emot meddelanden (på grund av kostnaden för att starta processorn varje gång miljön startas).
Rollbaserade åtkomstkrav
När du använder Microsoft Entra-ID som autentiseringsmekanism kontrollerar du att identiteten har rätt behörigheter:
- I den övervakade containern:
Microsoft.DocumentDB/databaseAccounts/readMetadata
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
- I lånecontainern:
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery
Ytterligare resurser
- Azure Cosmos DB SDK
- Slutför exempelprogrammet på GitHub
- Ytterligare användningsexempel på GitHub
- Azure Cosmos DB-workshoplabb för ändringsflödesprocessor
Nästa steg
Läs mer om ändringsflödesprocessorn i följande artiklar: