Condividi tramite


Eseguire la migrazione dalla libreria del processore del feed di modifiche ad Azure Cosmos DB .NET V3 SDK

SI APPLICA A: NoSQL

Questo articolo illustra i passaggi necessari per eseguire la migrazione del codice di un'applicazione esistente che usa la libreria del processore del feed di modifiche verso la funzionalità del feed di modifiche nella versione più recente di .NET SDK (denominata anche .NET V3 SDK).

Modifiche al codice necessarie

.NET V3 SDK presenta diverse modifiche di rilievo. Di seguito sono riportati i passaggi principali per eseguire la migrazione dell'applicazione:

  1. Convertire le istanze di DocumentCollectionInfo in riferimenti Container per i contenitori monitorati e lease.
  2. Le personalizzazioni che usano WithProcessorOptions devono essere aggiornate per l'uso WithLeaseConfiguration e WithPollInterval per gli intervalli, WithStartTime per l'ora di inizio e WithMaxItems per definire il numero massimo di elementi.
  3. Impostare processorName su GetChangeFeedProcessorBuilder in modo che corrisponda al valore configurato in ChangeFeedProcessorOptions.LeasePrefix, in alternativa usare string.Empty.
  4. Le modifiche non vengono più recapitate come IReadOnlyList<Document>, ma come una IReadOnlyCollection<T> in cui T è un tipo che è necessario definire, non è più presente una classe di elementi di base.
  5. Per gestire le modifiche, non è più necessaria un'implementazione di IChangeFeedObserver, ma è necessario definire un delegato. Il delegato può essere una funzione statica o, se è necessario mantenere lo stato tra le esecuzioni, è possibile creare una classe personalizzata e passare un metodo di istanza come delegato.

Ad esempio, se il codice originale per compilare il processore del feed di modifiche è simile al seguente:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

Il codice migrato verrà visualizzato come riportato di seguito:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Per il delegato, è possibile avere un metodo statico per ricevere gli eventi. Se si utilizzano informazioni provenienti da IChangeFeedObserverContext è possibile eseguire la migrazione per utilizzare ChangeFeedProcessorContext:

  • ChangeFeedProcessorContext.LeaseToken può essere usato in sostituzione di IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers può essere usato in sostituzione di IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics contiene informazioni dettagliate sulla latenza delle richieste per la risoluzione dei problemi
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Eventi di integrità e osservabilità

Se in precedenza si utilizzava IHealthMonitor o si usavano IChangeFeedObserver.OpenAsync e IChangeFeedObserver.CloseAsync, usare l'API Notifiche.

  • IChangeFeedObserver.OpenAsync può essere sostituito con WithLeaseAcquireNotification.
  • IChangeFeedObserver.CloseAsync può essere sostituito con WithLeaseReleaseNotification.
  • IHealthMonitor.InspectAsync può essere sostituito con WithErrorNotification.

Contenitore di stato e lease

Analogamente alla libreria del processore del feed di modifiche, la funzionalità del feed di modifiche in .NET V3 SDK usa un contenitore lease per memorizzare lo stato. Tuttavia, gli schemi sono diversi.

Il processore del feed di modifiche V3 dell'SDK rileverà qualsiasi stato della libreria precedente ed eseguirà automaticamente la migrazione verso il nuovo schema alla prima esecuzione del codice dell'applicazione migrata.

È possibile arrestare in modo sicuro l'applicazione usando il codice precedente, eseguire la migrazione del codice alla nuova versione, avviare l'applicazione migrata e tutte le modifiche apportate mentre l'applicazione è stata arrestata verranno prelevate ed elaborate dalla nuova versione.

Risorse aggiuntive

Passaggi successivi

È ora possibile procedere ad acquisire altre informazioni sul processore di feed di modifiche negli articoli seguenti: