Delen via


Wijzigingenfeed verwerken in Azure Blob Storage

Wijzigingenfeed biedt transactielogboeken van alle wijzigingen die zich voordoen in de blobs en de blobmetagegevens in uw opslagaccount. In dit artikel leest u hoe u wijzigingenfeedrecords leest met behulp van de processorbibliotheek voor de blob-wijzigingenfeed.

Zie Wijzigingenfeed in Azure Blob Storage voor meer informatie over de wijzigingenfeed.

Uw project instellen

In deze sectie wordt uitgelegd hoe u een project voorbereidt voor gebruik met de clientbibliotheek van de Blobs-wijzigingenfeed voor .NET.

Pakketten installeren

Installeer vanuit de projectmap het pakket voor de Clientbibliotheek van de Azure Storage-blobs-wijzigingenfeed voor .NET met behulp van de dotnet add package opdracht. In dit voorbeeld voegen we de --prerelease vlag toe aan de opdracht om de nieuwste preview-versie te installeren.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

In de codevoorbeelden in dit artikel worden ook de Azure Blob Storage - en Azure Identity-pakketten gebruikt.

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Voeg using-instructies toe

Voeg de volgende using instructies toe aan uw codebestand:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Een clientobject maken

Als u de toepassing wilt verbinden met Blob Storage, maakt u een exemplaar van de BlobServiceClient klasse. In het volgende voorbeeld ziet u hoe u een clientobject maakt met behulp van DefaultAzureCredential autorisatie. Zie Toegang autoriseren en verbinding maken met Blob Storage voor meer informatie. Als u met de wijzigingenfeed wilt werken, hebt u ingebouwde Azure RBAC-rol Storage Blob Data Reader of hoger nodig.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

Het clientobject wordt doorgegeven als een parameter aan een aantal van de methoden die in dit artikel worden weergegeven.

Records lezen in de wijzigingenfeed

Notitie

De wijzigingenfeed is een onveranderbare en alleen-lezen entiteit in uw opslagaccount. Elk gewenst aantal toepassingen kan de wijzigingenfeed op eigen gemak en onafhankelijk lezen en verwerken. Records worden niet verwijderd uit de wijzigingenfeed wanneer een toepassing deze leest. De lees- of iteratiestatus van elke verbruikende lezer is onafhankelijk en wordt alleen onderhouden door uw toepassing.

Het volgende codevoorbeeld doorloopt alle records in de wijzigingenfeed, voegt deze toe aan een lijst en retourneert vervolgens de lijst met wijzigingenfeed-gebeurtenissen:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

In het volgende codevoorbeeld worden enkele waarden uit de lijst met wijzigingenfeed-gebeurtenissen afgedrukt:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Leesrecords van een opgeslagen positie hervatten

U kunt ervoor kiezen om uw leespositie op te slaan in de wijzigingenfeed en vervolgens op een later moment door de records te doorlopen. U kunt de leespositie opslaan door de cursor van de wijzigingenfeed op te halen. De cursor is een tekenreeks en uw toepassing kan die tekenreeks op elke manier opslaan die zinvol is voor het ontwerp van uw toepassing, bijvoorbeeld in een bestand of database.

In dit voorbeeld worden alle records in de wijzigingenfeed herhaald, toegevoegd aan een lijst en wordt de cursor opgeslagen. De lijst en de cursor worden teruggezet naar de aanroeper.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Stroomverwerking van records

U kunt ervoor kiezen om wijzigingenfeedrecords te verwerken terwijl ze zijn doorgevoerd in de wijzigingenfeed. Zie specificaties. De wijzigingsevenementen worden gemiddeld op een periode van 60 seconden naar de wijzigingenfeed gepubliceerd. We raden u aan nieuwe wijzigingen te peilen met deze periode in gedachten wanneer u uw poll-interval opgeeft.

In dit voorbeeld worden regelmatig polls naar wijzigingen. Als er wijzigingsrecords bestaan, verwerkt deze code deze records en slaat u de cursor van de wijzigingenfeed op. Als het proces wordt gestopt en vervolgens opnieuw wordt gestart, kan de toepassing de cursor gebruiken om de verwerking van records te hervatten waar het voor het laatst was gebleven. In dit voorbeeld wordt de cursor voor demonstratiedoeleinden opgeslagen in een lokaal bestand, maar uw toepassing kan deze opslaan in elke vorm die het meest zinvol is voor uw scenario.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Records binnen een bepaald tijdsbereik lezen

U kunt records lezen die binnen een bepaald tijdsbereik vallen. In dit voorbeeld worden alle records in de wijzigingenfeed herhaald die binnen een specifiek datum- en tijdsbereik vallen, worden ze toegevoegd aan een lijst en wordt de lijst geretourneerd:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

De begintijd die u opgeeft, wordt naar beneden afgerond op het dichtstbijzijnde uur en de eindtijd wordt naar boven afgerond op het dichtstbijzijnde uur. Het is mogelijk dat gebruikers gebeurtenissen zien die plaatsvonden vóór de begintijd en na de eindtijd. Het is ook mogelijk dat sommige gebeurtenissen die optreden tussen de begin- en eindtijd niet worden weergegeven. Dat komt doordat gebeurtenissen kunnen worden vastgelegd tijdens het uur voorafgaand aan de begintijd of tijdens het uur na de eindtijd.

Volgende stappen