Compartilhar via


Processar o feed de alterações no Armazenamento de Blobs do Azure

O feed de alterações fornece os logs de transações de todas as alterações que ocorrem nos blobs e nos metadados de blob na conta de armazenamento. Este artigo mostra como ler os registros do feed de alterações usando a biblioteca do processador do feed de alterações de blob.

Para saber mais sobre o feed de alterações, confira o Feed de alterações no Armazenamento de Blobs do Azure.

Configurar o seu projeto

Esta seção orienta você na preparação de um projeto para trabalhar com a biblioteca de clientes do Feed de Alterações de Blobs para .NET.

Instalar Pacotes

No diretório do projeto, instale o pacote para a biblioteca de clientes do Feed de Alterações de Blobs de Armazenamento do Azure para .NET usando o comando dotnet add package. Neste exemplo, adicionamos o sinalizador --prerelease ao comando para instalar a versão prévia mais recente.

dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease

Os exemplos de código neste artigo também usam os pacotes de Armazenamento de Blobs do Azure e de Identidade do Azure.

dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs

Adicione diretivas using

Adicione as seguintes diretivas using ao arquivo de código:

using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;

Criar um objeto cliente

Para conectar o aplicativo ao Armazenamento de Blobs, crie uma instância da classe BlobServiceClient. O exemplo a seguir mostra como criar um objeto cliente usando DefaultAzureCredential para autorização. Para saber mais, confira Autorizar o acesso e se conectar ao Armazenamento de Blobs. Para trabalhar com o feed de alterações, você precisa da função interna do RBAC do Azure de Leitor de dados de Blob de Armazenamento ou superior.

// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";

BlobServiceClient client = new(
        new Uri($"https://{accountName}.blob.core.windows.net"),
        new DefaultAzureCredential());

O objeto cliente é passado como um parâmetro para alguns dos métodos mostrados neste artigo.

Ler registros no feed de alterações

Observação

O feed de alterações é uma entidade imutável e somente leitura na conta de armazenamento. Qualquer quantidade de aplicativos pode ler e processar o feed de alterações simultaneamente e de maneira independente, como preferir. Os registros não são removidos do feed de alterações quando lidos por um aplicativo. O estado de leitura ou de iteração de cada leitor de consumo é independente e mantido somente pelo próprio aplicativo.

O exemplo de código a seguir itera em todos os registros no feed de alterações, adiciona-os a uma lista e retorna a lista de eventos do feed de alterações:

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
    // Create a new BlobChangeFeedClient
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = [];

    // Get all the events in the change feed
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

O exemplo de código a seguir imprime alguns valores da lista de eventos do feed de alterações:

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Operation: " + operationName.ToString());
    }
}

Continuar a leitura de registros de uma posição salva

Você pode optar por salvar sua posição de leitura no feed de alterações e retomar a iteração pelos registros em um momento futuro. Você pode salvar a posição de leitura obtendo o cursor do feed de alterações. O cursor é uma cadeia de caracteres e o aplicativo pode salvar essa cadeia de qualquer maneira que faça sentido para o design do aplicativo, por exemplo, para um arquivo ou um banco de dados.

Este exemplo itera por todos os registros no feed de alterações, os adiciona uma lista e salva o cursor. A lista e o cursor são retornados para o chamador.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
    BlobServiceClient client,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);
    }

    // Update the change feed cursor. The cursor is not required to get each page of events,
    // it's intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Transmitir o processamento de registros

Você pode optar por processar os registros do feed de alterações à medida em que eles são confirmados no feed de alterações. Confira as Especificações. Os eventos de alteração são publicados no feed de alterações em um período de 60 segundos em média. É recomendável que você pesquise novas alterações com esse período em mente ao especificar o intervalo de pesquisa.

Este exemplo pesquisa as alterações periodicamente. Se houver registros de alteração, esse código processa os registros e salva o cursor do feed de alterações. Dessa forma, se o processo for interrompido e iniciado novamente, o aplicativo poderá usar o cursor para retomar os registros de processamento onde ele parou pela última vez. Este exemplo salva o cursor em um arquivo local para fins de demonstração, mas seu aplicativo pode salvá-lo em qualquer forma que faça mais sentido para seu cenário.

public async Task ChangeFeedStreamAsync(
    BlobServiceClient client,
    int waitTimeMs,
    string cursor)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();

        while (true)
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Operation: " + operationName.ToString());
                }

                // Helper method to save cursor
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }
}

void SaveCursor(string cursor)
{
    // Specify the path to the file where you want to save the cursor
    string filePath = "path/to/cursor.txt";

    // Write the cursor value to the file
    File.WriteAllText(filePath, cursor);
}

Ler registros em um intervalo de tempo específico

Você pode ler os registros que se enquadram em um intervalo de tempo específico. Este exemplo itera todos os registros no feed de alterações que se enquadram em um intervalo de data e hora específico, adiciona-os a uma lista e retorna a lista:

async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
    // Get a new change feed client
    BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

A hora de início que você fornece é arredondada para baixo, para a hora mais próxima, e a hora de término é arredondada para cima, para a hora mais próxima. É possível que os usuários vejam eventos que ocorreram antes da hora de início e após a hora de término. Também é possível que alguns eventos que ocorreram entre a hora de início e de término não apareçam. Isso ocorre porque os eventos podem ter sido registrados durante a hora anterior à hora de início ou durante a hora posterior à hora de término.

Próximas etapas