Prozkoumání kanálu změn ve službě Azure Cosmos DB
Kanál změn ve službě Azure Cosmos DB je trvalý záznam změn v kontejneru v pořadí, v jakém k nim dochází. Podpora kanálu změn ve službě Azure Cosmos DB funguje díky naslouchání změnám kontejneru Azure Cosmos DB. Výstupem je pak seznam změněných dokumentů v pořadí podle času úprav. Trvalé změny je možné zpracovat asynchronně a přírůstkově a výstup lze distribuovat mezi jednoho nebo více příjemců pro paralelní zpracování.
Kanál změn a různé operace
Dnes se v informačním kanálu změn zobrazí všechny vložení a aktualizace. Kanál změn nemůžete filtrovat pro konkrétní typ operace. V současné době kanál změn neprovádí operace odstranění protokolu. Jako alternativní řešení můžete přidat obnovitelné značky k odstraněným položkám. Můžete například přidat atribut do položky s názvem "odstraněno", nastavit jeho hodnotu na true a pak nastavit hodnotu TTL (time-to-live) u položky. Nastavením hodnoty TTL zajistíte, že se položka automaticky odstraní.
Čtení z kanálu změn služby Azure Cosmos DB
S kanálem změn služby Azure Cosmos DB můžete pracovat buď pomocí modelu nabízených oznámení, nebo modelu vyžádané replikace. S modelem nabízených oznámení procesor kanálu změn nasdílí práci klientovi, který má obchodní logiku pro zpracování této práce. Složitost při kontrole práce a ukládání stavu poslední zpracované práce se ale zpracovává v procesoru kanálu změn.
S modelem vyžádané replikace musí klient vyžádat práci ze serveru. V tomto případě má klient obchodní logiku pro zpracování práce a také ukládá stav pro poslední zpracovanou práci. Klient zpracovává vyrovnávání zatížení napříč několika klienty, které zpracovává paralelně, a zpracovává chyby.
Poznámka:
Doporučujeme použít model nabízených oznámení, protože se nemusíte starat o dotazování kanálu změn na budoucí změny, uložení stavu poslední zpracované změny a dalších výhod.
Většina scénářů, které používají kanál změn služby Azure Cosmos DB, používá jednu z možností modelu nabízených oznámení. Existují však některé scénáře, ve kterých můžete chtít mít větší kontrolu nad modelem vyžádání obsahu. Další ovládací prvek nízké úrovně zahrnuje:
- Čtení změn z konkrétního klíče oddílu
- Řízení tempa, jakým klient přijímá změny ke zpracování
- Jednorázové čtení existujících dat v kanálu změn (například migrace dat)
Čtení kanálu změn pomocí modelu nabízených oznámení
Z kanálu změn můžete číst dvěma způsoby pomocí modelu nabízených oznámení: triggery Azure Functions Azure Cosmos DB a knihovny procesoru kanálu změn. Azure Functions používá procesor kanálu změn na pozadí, takže se podobají oběma způsobům čtení kanálu změn. Azure Functions si můžete představit jako jednoduše hostitelská platforma pro procesor kanálu změn, nikoli úplně jiný způsob čtení kanálu změn. Azure Functions používá procesor kanálu změn na pozadí. Automaticky paralelizuje zpracování změn napříč oddíly kontejneru.
Azure Functions
V kanálu změn kontejneru Azure Cosmos DB můžete vytvořit malé reaktivní funkce, které se automaticky aktivují při každé nové události. S triggerem Služby Azure Functions pro službu Azure Cosmos DB můžete používat škálování procesoru kanálu změn a spolehlivé funkce detekce událostí bez nutnosti udržovat žádnou infrastrukturu pracovních procesů.
Procesor kanálu změn
Procesor kanálu změn je součástí sad SDK .NET služby Azure Cosmos DB V3 a Java V4 . Zjednodušuje proces čtení kanálu změn a distribuuje zpracování událostí mezi několik příjemců efektivně.
Implementace procesoru kanálu změn zahrnuje čtyři hlavní komponenty:
Monitorovaný kontejner: Monitorovaný kontejner obsahuje data, ze kterých se generuje kanál změn. Jakékoli vložení nebo aktualizace v monitorovaném kontejneru se projeví v kanálu změn kontejneru.
Kontejner zapůjčení: Kontejner zapůjčení funguje jako úložiště stavu a koordinuje zpracování kanálu změn napříč několika pracovními procesy. Kontejner zapůjčení může být uložený ve stejném účtu jako monitorovaný kontejner nebo v samostatném účtu.
Výpočetní instance: Výpočetní instance hostuje procesor kanálu změn, aby naslouchal změnám. V závislosti na platformě může představovat virtuální počítač, pod Kubernetes, instanci služby Aplikace Azure Service, skutečný fyzický počítač. V tomto článku má jedinečný identifikátor, na který odkazuje název instance.
Delegát: Delegát je kód, který definuje, co vy, vývojář, chce dělat s každou dávkou změn, které procesor kanálu změn čte.
Při implementaci procesoru kanálu změn je bod vstupu vždy monitorovaný kontejner z Container
instance, kterou voláte GetChangeFeedProcessorBuilder
:
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
Kde první parametr je jedinečný název, který popisuje cíl tohoto procesoru a druhý název je implementace delegáta, která zpracovává změny. Následuje příklad delegáta:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
Potom definujete název výpočetní instance nebo jedinečný identifikátor s WithInstanceName
, to by mělo být jedinečné a jiné v každé výpočetní instanci, kterou nasazujete, a nakonec, což je kontejner pro zachování stavu zapůjčení s WithLeaseContainer
.
Volání Build
poskytuje instanci procesoru, kterou můžete začít voláním StartAsync
.
Normální životní cyklus instance hostitele je následující:
- Načte kanál změn.
- Pokud nedošlo k žádným změnám, přejděte do režimu spánku po předdefinovanou dobu (přizpůsobitelné pomocí
WithPollInterval
Builder
) a přejděte na #1. - Pokud k nějakým změnám došlo, odešle je do delegáta.
- Jakmile delegát úspěšně dokončí zpracování změn, aktualizuje poslední zpracovaný bod v čase v úložišti zapůjčení a přejde ke kroku 1.