Sdílet prostřednictvím


Procesor kanálu změn ve službě Azure Cosmos DB

PLATÍ PRO: NoSQL

Procesor kanálu změn je součástí sad SDK .NET služby Azure Cosmos DB V3 a Java V4 . Zjednodušuje proces čtení kanálu změn a distribuuje zpracování událostí mezi několik příjemců efektivně.

Hlavní výhodou použití procesoru kanálu změn je návrh odolný proti chybám, který zajišťuje "alespoň jednou" doručení všech událostí v kanálu změn.

Podporované sady SDK

.Net V3 Java Node.JS Python

Komponenty procesoru kanálu změn

Procesor kanálu změn má čtyři hlavní komponenty:

  • Monitorovaný kontejner: Monitorovaný kontejner obsahuje data, ze kterých se generuje kanál změn. Jakékoli vložení nebo aktualizace v monitorovaném kontejneru se projeví v kanálu změn kontejneru.

  • Kontejner zapůjčení: Kontejner zapůjčení funguje jako úložiště stavu a koordinuje zpracování kanálu změn napříč několika pracovními procesy. Kontejner zapůjčení může být uložený ve stejném účtu jako monitorovaný kontejner nebo v samostatném účtu.

  • Výpočetní instance: Výpočetní instance hostuje procesor kanálu změn, aby naslouchal změnám. V závislosti na platformě ho může reprezentovat virtuální počítač, pod Kubernetes, instance služby Aplikace Azure Service nebo skutečný fyzický počítač. Výpočetní instance má jedinečný identifikátor, který se v tomto článku nazývá název instance.

  • Delegát: Delegát je kód, který definuje, co vy, vývojář, chce dělat s každou dávkou změn, které procesor kanálu změn čte.

Abychom lépe pochopili, jak tyto čtyři prvky procesoru kanálu změn spolupracují, podívejme se na příklad v následujícím diagramu. Monitorovaný kontejner ukládá položky a jako klíč oddílu používá City. Hodnoty klíče oddílu se distribuují v oblastech (každá oblast představuje fyzický oddíl), který obsahuje položky.

Diagram znázorňuje dvě výpočetní instance a procesor kanálu změn přiřadí jednotlivým instancím různé rozsahy, aby se maximalizovala distribuce výpočetních prostředků. Každá instance má jiný jedinečný název.

Každá oblast se čte paralelně. Průběh rozsahu se udržuje odděleně od ostatních oblastí v kontejneru zapůjčení prostřednictvím dokumentu zapůjčení . Kombinace zapůjčení představuje aktuální stav procesoru kanálu změn.

Příklad procesoru kanálu změn

Implementace procesoru kanálu změn

Procesor kanálu změn v .NET je k dispozici pro nejnovější režim verzí a všechny verze a režim odstranění. Všechny verze a režim odstranění jsou ve verzi Preview a podporuje se pro procesor kanálu změn počínaje verzí 3.40.0-preview.0. Vstupní bod pro oba režimy je vždy monitorovaný kontejner.

Pokud chcete číst pomocí režimu nejnovější verze, zavoláte GetChangeFeedProcessorBuilderv Container instanci:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Pokud chcete číst pomocí všech verzí a odstranit režim, zavolejte GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes z Container instance:

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

V obou režimech je prvním parametrem jedinečný název, který popisuje cíl tohoto procesoru. Druhým názvem je implementace delegáta, která zpracovává změny.

Tady je příklad delegáta pro nejnovější režim verze:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Tady je příklad delegáta pro všechny verze a režim odstranění:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Poté pomocí . definujete název výpočetní instance nebo jedinečný identifikátor .WithInstanceName Název výpočetní instance by měl být jedinečný a jiný pro každou výpočetní instanci, kterou nasazujete. Kontejner nastavíte tak, aby se zachoval stav zapůjčení pomocí WithLeaseContainer.

Volání Build poskytuje instanci procesoru, kterou můžete začít voláním StartAsync.

Poznámka:

Předchozí fragmenty kódu pocházejí z ukázek na GitHubu. Ukázku můžete získat pro nejnovější režim verze nebo všechny verze a režim odstranění.

Životní cyklus zpracování

Normální životní cyklus instance hostitele je následující:

  1. Načte kanál změn.
  2. Pokud nedojde k žádným změnám, přejděte do režimu spánku po předdefinovanou dobu (přizpůsobitelné pomocí WithPollInterval v Tvůrci) a přejděte na #1.
  3. Pokud dojde ke změnám, pošlete je delegátu.
  4. Jakmile delegát dokončí zpracování změn úspěšně, aktualizujte úložiště zapůjčení nejnovějším zpracovaným bodem v čase a přejděte na #1.

Zpracování chyb

Procesor kanálu změn je odolný vůči chybám uživatelského kódu. Pokud má implementace delegáta neošetřenou výjimku (krok 4), vlákno, které zpracovává danou dávku změn, se zastaví a nakonec se vytvoří nové vlákno. Nové vlákno zkontroluje poslední bod v čase, který úložiště zapůjčení uložilo pro tento rozsah hodnot klíče oddílu. Nové vlákno se odsud restartuje a delegování efektivně odesílá stejnou dávku změn. Toto chování pokračuje, dokud váš delegát nezpracuje změny správně, a je to důvod, proč procesor kanálu změn má záruku "alespoň jednou".

Poznámka:

V jednom scénáři se dávka změn neopakuje. Pokud k selhání dojde při prvním spuštění delegáta, úložiště zapůjčení nemá pro opakování žádný předchozí uložený stav. V takových případech se při opakování použije počáteční spouštěcí konfigurace, která může nebo nemusí obsahovat poslední dávku.

Pokud chcete zabránit tomu, aby procesor kanálu změn neustále zablokoval opakování stejné dávky změn, měli byste do fronty chybových zpráv přidat do kódu delegáta logiku pro zápis dokumentů, při výjimce. Tento návrh zajistí, že budete moct sledovat nezpracované změny a zároveň i nadále zpracovávat budoucí změny. Fronta chybových zpráv může být dalším kontejnerem Azure Cosmos DB. Na přesném úložišti dat nezáleží. Chcete, aby se nezpracované změny zachovaly.

Pomocí estimátoru kanálu změn můžete také sledovat průběh instancí procesoru kanálu změn při čtení kanálu změn nebo můžete použít oznámení životního cyklu k detekci základních selhání.

Oznámení životního cyklu

Procesor kanálu změn můžete připojit k libovolné relevantní události v jejím životním cyklu. Můžete se rozhodnout, že budete upozorněni na jednu nebo všechny z nich. Doporučuje se alespoň zaregistrovat oznámení o chybě:

  • Zaregistrujte obslužnou rutinu, WithLeaseAcquireNotification aby byla upozorněna, když aktuální hostitel získá zapůjčení, aby ho začal zpracovávat.
  • Zaregistrujte obslužnou rutinu, WithLeaseReleaseNotification která má být upozorněna, když aktuální hostitel uvolní zapůjčení a přestane ho zpracovávat.
  • Zaregistrujte obslužnou rutinu, WithErrorNotification aby byla upozorněna, když aktuální hostitel během zpracování narazí na výjimku. Musíte být schopni rozlišit, jestli je zdrojem delegát uživatele (neošetřená výjimka) nebo chyba, se kterou se procesor setká při pokusu o přístup k monitorovanému kontejneru (například problémy se sítí).

Oznámení o životním cyklu jsou dostupná v obou režimech kanálu změn. Tady je příklad oznámení životního cyklu v nejnovějším režimu verze:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Jednotka nasazení

Jedna jednotka nasazení procesoru kanálu změn se skládá z jedné nebo více výpočetních instancí, které mají stejnou hodnotu pro processorName a stejnou konfiguraci kontejneru zapůjčení, ale různé názvy instancí. Můžete mít mnoho jednotek nasazení, ve kterých má každá jednotka jiný obchodní tok pro změny a každá jednotka nasazení se skládá z jedné nebo více instancí.

Můžete mít například jednu jednotku nasazení, která aktivuje externí rozhraní API pokaždé, když v kontejneru dojde ke změně. Jiná jednotka nasazení může přesouvat data v reálném čase pokaždé, když dojde ke změně. Když dojde ke změně v monitorovaném kontejneru, zobrazí se oznámení o všech jednotkách nasazení.

Dynamické škálování

Jak už bylo zmíněno dříve, v rámci jednotky nasazení můžete mít jednu nebo více výpočetních instancí. Pokud chcete využít výhod distribuce výpočetních prostředků v rámci jednotky nasazení, jedinými klíčovými požadavky jsou tyto:

  • Všechny instance musí mít stejnou konfiguraci kontejneru zapůjčení.
  • Všechny instance by měly mít stejnou hodnotu pro processorName.
  • Každá instance musí mít jiný název instance (WithInstanceName).

Pokud se použijí tyto tři podmínky, procesor kanálu změn distribuuje všechna zapůjčení, která jsou v kontejneru zapůjčení, napříč všemi spuštěnými instancemi této jednotky nasazení a paralelizuje výpočetní prostředky pomocí algoritmu stejné distribuce. Zapůjčení je vlastněno jednou instancí, takže počet instancí by neměl být větší než počet zapůjčení.

Počet instancí se může zvětšit a zmenšit. Procesor kanálu změn dynamicky upravuje zatížení tím, že ho odpovídajícím způsobem redistribuuje.

Procesor kanálu změn navíc může dynamicky upravit škálování kontejneru, pokud se zvýší propustnost nebo úložiště kontejneru. Když váš kontejner roste, procesor kanálu změn transparentně zpracovává scénář dynamickým zvýšením zapůjčení a distribucí nových zapůjčení mezi existující instance.

Počáteční čas

Ve výchozím nastavení se při prvním spuštění procesoru kanálu změn inicializuje kontejner zapůjčení a spustí jeho životní cyklus zpracování. Všechny změny, ke kterým došlo v monitorovaném kontejneru předtím, než se procesor kanálu změn inicializuje poprvé, se nezjistí.

Čtení z předchozího data a času

Procesor kanálu změn je možné inicializovat tak, aby četl změny počínaje konkrétním datem a časem předáním instance DateTime WithStartTime rozšíření tvůrce:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

Procesor kanálu změn se inicializuje pro konkrétní datum a čas a začne číst změny, ke kterým došlo později.

Čtení od začátku

V jiných scénářích, jako jsou migrace dat nebo při analýze celé historie kontejneru, je potřeba číst informační kanál změn od začátku životnosti tohoto kontejneru. Můžete použít WithStartTime v rozšíření tvůrce, ale předat DateTime.MinValue.ToUniversalTime(), který vygeneruje reprezentaci minimální DateTime hodnoty UTC, jako je v tomto příkladu:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Procesor kanálu změn se inicializuje a začne číst změny od začátku životnosti kontejneru.

Poznámka:

Tyto možnosti přizpůsobení fungují pouze pro nastavení výchozího bodu v čase procesoru kanálu změn. Po prvním inicializaci kontejneru zapůjčení nemá změna těchto možností žádný vliv.

Přizpůsobení výchozího bodu je k dispozici pouze pro režim kanálu změn nejnovější verze. Pokud používáte všechny verze a režim odstranění, musíte začít číst od okamžiku spuštění procesoru nebo obnovit z předchozího stavu zapůjčení, který je v období nepřetržitého uchovávání záloh vašeho účtu.

Kanál změn a zřízená propustnost

Operace čtení kanálu změn u monitorovaných jednotek žádostí spotřebovávají kontejnery. Ujistěte se, že u monitorovaného kontejneru nedochází k omezování. Omezování zvyšuje zpoždění při příjmu událostí kanálu změn v procesorech.

Operace s kontejnerem zapůjčení (aktualizace a údržba stavu) spotřebovávají jednotky žádostí. Čím vyšší je počet instancí, které používají stejný kontejner zapůjčení, tím vyšší je potenciální spotřeba jednotek žádostí. Ujistěte se, že u vašeho kontejneru zapůjčení nedochází k omezování. Omezování přidává zpoždění při příjmu událostí kanálu změn. Omezování může dokonce úplně ukončit zpracování.

Sdílení kontejneru zapůjčení

Kontejner zapůjčení můžete sdílet napříč několika jednotkami nasazení. V kontejneru sdíleného zapůjčení každá jednotka nasazení naslouchá jinému monitorovanému kontejneru nebo má jinou hodnotu .processorName V této konfiguraci každá jednotka nasazení udržuje nezávislý stav v kontejneru zapůjčení. Zkontrolujte spotřebu jednotek žádosti v kontejneru zapůjčení a ujistěte se, že zřízená propustnost stačí pro všechny jednotky nasazení.

Pokročilá konfigurace zapůjčení

Tři konfigurace klíčů můžou ovlivnit fungování procesoru kanálu změn. Každá konfigurace ovlivňuje spotřebu jednotek žádosti v kontejneru zapůjčení. Při vytváření procesoru kanálu změn můžete nastavit jednu z těchto konfigurací, ale pečlivě je použijte:

  • Získání zapůjčení: Ve výchozím nastavení každých 17 sekund. Hostitel pravidelně kontroluje stav úložiště zapůjčení a v rámci procesu dynamického škálování zvažte získání zapůjčení. Tento proces se provádí spuštěním dotazu v kontejneru zapůjčení. Snížením této hodnoty se zvětší vyrovnávání a získání zapůjčení, ale zvýší se spotřeba jednotek žádosti v kontejneru zapůjčení.
  • Vypršení platnosti zapůjčení: Ve výchozím nastavení 60 sekund. Definuje maximální dobu, po kterou může existovat zapůjčení bez jakékoli aktivity obnovení, než ji získá jiný hostitel. Když dojde k chybovému ukončení hostitele, zapůjčení, které vlastní, vyzvedne ostatní hostitelé po uplynutí tohoto časového období a nakonfigurovaného intervalu obnovení. Snížením této hodnoty dojde k rychlejšímu obnovení po chybovém ukončení hostitele, ale hodnota vypršení platnosti by nikdy neměla být nižší než interval obnovení.
  • Prodloužení zapůjčení: Ve výchozím nastavení každých 13 sekund. Hostitel, který vlastní zapůjčení, pravidelně obnovuje zapůjčení, i když neexistují žádné nové změny, které by bylo potřeba využívat. Tento proces se provádí spuštěním příkazu Nahradit v zapůjčení. Snížením této hodnoty se sníží doba potřebná ke zjištění zapůjčení ztracených chybovým ukončením hostitele, ale zvýší se spotřeba jednotek požadavků v kontejneru zapůjčení.

Kde hostovat procesor kanálu změn

Procesor kanálu změn je možné hostovat na libovolné platformě, která podporuje dlouhotrvající procesy nebo úlohy. Několik příkladů:

I když může procesor kanálu změn běžet v krátkodobých prostředích, protože kontejner zapůjčení udržuje stav, cyklus spuštění těchto prostředí přidává zpoždění k době, kterou trvá přijímání oznámení (kvůli režii při každém spuštění procesoru).

Požadavky na přístup na základě role

Pokud jako ověřovací mechanismus používáte ID Microsoft Entra, ujistěte se, že identita má správná oprávnění:

  • V monitorovaném kontejneru:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • V kontejneru zapůjčení:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Další materiály

Další kroky

Další informace o procesoru kanálu změn najdete v následujících článcích: