Compartir vía


Migración desde la biblioteca de procesadores de fuente de cambios al SDK de Azure Cosmos DB para .NET V3

SE APLICA A: NoSQL

En este artículo se describen los pasos necesarios para migrar el código de una aplicación existente que usa la biblioteca de procesadores de fuente de cambios a la característica de fuente de cambios en la versión más reciente del SDK de .NET (que también se conoce como el SDK de .NET V3).

Cambios de código necesarios

El SDK de .NET V3 tiene varios cambios importantes; estos son los pasos principales para migrar la aplicación:

  1. Convierta las instancias de DocumentCollectionInfo en referencias de Container para los contenedores supervisados y de concesiones.
  2. Las personalizaciones que usan WithProcessorOptions se deben actualizar para usar WithLeaseConfiguration y WithPollInterval para los intervalos, WithStartTimepara la hora de inicio y WithMaxItems para definir el número máximo de elementos.
  3. Establezca el processorName en GetChangeFeedProcessorBuilder para que coincida con el valor configurado en ChangeFeedProcessorOptions.LeasePrefix, o bien, puede usar string.Empty.
  4. Los cambios ya no se entregan como IReadOnlyList<Document> sino que, en su lugar, se trata de un IReadOnlyCollection<T> donde T es un tipo que debe definir. Ya no hay ninguna clase de elemento base.
  5. Para controlar los cambios, ya no necesita una implementación de IChangeFeedObserver, sino que debe definir un delegado. El delegado puede ser una función estática o, si necesita mantener el estado entre las ejecuciones, puede crear su propia clase y pasar un método de instancia como delegado.

Por ejemplo, si el código original para compilar el procesador de fuente de cambios tiene el siguiente aspecto:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

El código migrado tendrá el siguiente aspecto:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Para el delegado, puede tener un método estático para recibir los eventos. Si fuera a consumir información de IChangeFeedObserverContext, puede migrarla para usar ChangeFeedProcessorContext:

  • ChangeFeedProcessorContext.LeaseToken se puede usar en lugar de IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers se puede usar en lugar de IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics contiene información detallada sobre la latencia de solicitud para la solución de problemas
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Eventos de mantenimiento y observabilidad

Si anteriormente usaba IHealthMonitor o aprovechaba IChangeFeedObserver.OpenAsync y IChangeFeedObserver.CloseAsync, use la API de notificaciones.

  • IChangeFeedObserver.OpenAsync se puede reemplazar por WithLeaseAcquireNotification.
  • IChangeFeedObserver.CloseAsync se puede reemplazar por WithLeaseReleaseNotification.
  • IHealthMonitor.InspectAsync se puede reemplazar por WithErrorNotification.

Contenedor de estado y concesión

De manera similar a la biblioteca de procesadores de fuente de cambios, la característica de fuente de cambios del SDK de .NET V3 usa un contenedor de concesión para almacenar el estado. Sin embargo, los esquemas son diferentes.

El procesador de fuente de cambios del SDK V3 detectará cualquier estado de la biblioteca anterior y lo migrará al esquema nuevo de manera automática en la primera ejecución del código de aplicación migrada.

Puede detener la aplicación de manera segura con el código anterior, migrar el código a la versión nueva, iniciar la aplicación migrada y todos los cambios que se hayan producido mientras se detuvo la aplicación se recogerán y procesarán con la versión nueva.

Recursos adicionales

Pasos siguientes

Puede obtener más información sobre el procesador de la fuente de cambios en los siguientes artículos: