Sdílet prostřednictvím


Kanál změn procesů ve službě Azure Blob Storage

Kanál změn poskytuje transakční protokoly všech změn, ke kterým dochází u objektů blob a metadat objektů blob ve vašem účtu úložiště. V tomto článku se dozvíte, jak číst záznamy kanálu změn pomocí knihovny procesoru kanálu změn objektů blob.

Další informace o kanálu změn najdete v tématu Kanál změn ve službě Azure Blob Storage.

Nastavení projektu

Tato část vás provede přípravou projektu pro práci s klientskou knihovnou kanálu změn objektů blob pro .NET.

Instalace balíčků

Z adresáře projektu pomocí příkazu nainstalujte balíček pro klientskou knihovnu dotnet add package kanálu změn Azure Storage Blobs pro .NET. V tomto příkladu --prerelease přidáme příznak k příkazu pro instalaci nejnovější verze Preview.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

Příklady kódu v tomto článku také používají balíčky Azure Blob Storage a Azure Identity .

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Přidání using direktiv

Do souboru kódu přidejte následující using direktivy:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Vytvoření objektu klienta

Pokud chcete aplikaci připojit ke službě Blob Storage, vytvořte instanci BlobServiceClient třídy. Následující příklad ukazuje, jak vytvořit objekt klienta pro DefaultAzureCredential autorizaci. Další informace najdete v tématu Autorizace přístupu a připojení ke službě Blob Storage. Pokud chcete pracovat s kanálem změn, potřebujete integrovanou roli Čtenář dat objektů blob služby Storage nebo vyšší role Azure RBAC.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

Objekt klienta se předává jako parametr některým metodám uvedeným v tomto článku.

Čtení záznamů v kanálu změn

Poznámka:

Kanál změn je neměnná a jen pro čtení entita ve vašem účtu úložiště. Libovolný počet aplikací může číst a zpracovávat informační kanál změn současně a nezávisle na svých vlastních pohodlích. Záznamy se z kanálu změn neodeberou, když je aplikace přečte. Stav čtení nebo iterace každého čtenáře, který využívá, je nezávislý a spravovaný pouze vaší aplikací.

Následující příklad kódu iteruje všechny záznamy v kanálu změn, přidá je do seznamu a vrátí seznam událostí kanálu změn:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Následující příklad kódu vytiskne některé hodnoty ze seznamu událostí kanálu změn:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Obnovení čtení záznamů z uložené pozice

Umístění pro čtení můžete uložit v informačním kanálu změn a pak pokračovat v iterování záznamů v budoucnu. Umístění pro čtení můžete uložit získáním kurzoru kanálu změn. Kurzor je řetězec a aplikace může tento řetězec uložit jakýmkoli způsobem, který dává smysl pro návrh vaší aplikace, například do souboru nebo databáze.

Tento příklad prochází všechny záznamy v kanálu změn, přidá je do seznamu a uloží kurzor. Volajícímu se vrátí seznam a kurzor.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Zpracování datových proudů záznamů

Můžete se rozhodnout zpracovávat záznamy kanálu změn při jejich potvrzení do kanálu změn. Viz specifikace. Události změn se publikují v kanálu změn v průměru po dobu 60 sekund. Při zadávání intervalu hlasování doporučujeme dotazovat se na nové změny s ohledem na toto období.

Tento příklad se pravidelně dotazuje na změny. Pokud existují záznamy změn, tento kód tyto záznamy zpracuje a uloží kurzor kanálu změn. Tímto způsobem, pokud je proces zastaven a znovu spuštěn, může aplikace pomocí kurzoru pokračovat ve zpracování záznamů, kde naposledy skončila. Tento příklad uloží kurzor do místního souboru pro demonstrační účely, ale vaše aplikace ji může uložit v libovolném formuláři, který dává pro váš scénář největší smysl.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Čtení záznamů v určitém časovém rozsahu

Můžete číst záznamy, které spadají do určitého časového rozsahu. Tento příklad iteruje všechny záznamy v kanálu změn, které spadají do určitého data a časového rozsahu, přidá je do seznamu a vrátí seznam:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

Zadaný počáteční čas se zaokrouhlí dolů na nejbližší hodinu a koncový čas se zaokrouhlí nahoru na nejbližší hodinu. Je možné, že uživatelé můžou vidět události, ke kterým došlo před počátečním časem a po koncovém čase. Je také možné, že se některé události, ke kterým dochází mezi počátečním a koncovým časem, nezobrazí. Důvodem je to, že události můžou být zaznamenány během hodiny předcházející počátečnímu času nebo během hodiny po koncovém čase.

Další kroky