Condividi tramite


Eseguire la migrazione dalla libreria di esecuzione bulk al supporto bulk in Azure Cosmos DB .NET V3 SDK

SI APPLICA A: NoSQL

Questo articolo descrive i passaggi necessari per eseguire la migrazione del codice di un'applicazione esistente che usa la libreria di esecuzione bulk .NET alla funzionalità supporto bulk nella versione più recente di .NET SDK.

Abilitare il supporto bulk

Abilitare il supporto bulk nell'istanza di CosmosClient tramite la configurazione di AllowBulkExecution:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Creare attività per ogni operazione

Il supporto bulk in .NET SDK funziona sfruttando le operazioni di Task Parallel Library e raggruppamento che si verificano simultaneamente.

Non esiste un singolo metodo nell'SDK che accetti l'elenco di documenti o operazioni come parametro di input, ma è invece necessario creare un'attività per ogni operazione che si vuole eseguire in bulk e quindi attenderne semplicemente il completamento.

Ad esempio, se l'input iniziale è un elenco di elementi in cui ogni elemento ha lo schema seguente:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Se si vuole eseguire l'importazione bulk (simile all'uso di BulkExecutor.BulkImportAsync), è necessario disporre di chiamate simultanee verso CreateItemAsync. Ad esempio:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Se si desidera eseguire l'aggiornamento bulk (simile all'uso di BulkExecutor.BulkUpdateAsync), è necessario disporre di chiamate simultanee al metodo ReplaceItemAsync dopo l'aggiornamento del valore dell'elemento. Ad esempio:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

E se si desidera eseguire l'eliminazione bulk (analogamente all'uso di BulkExecutor.BulkDeleteAsync), è necessario avere chiamate simultanee a DeleteItemAsync, con id e la chiave di partizione di ogni elemento. Ad esempio:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Acquisire lo stato del risultato dell'attività

Negli esempi di codice precedenti è stato creato un elenco simultaneo di attività e viene chiamato il metodo CaptureOperationResponse in ognuna di queste attività. Questo metodo è un'estensione che consente di mantenere uno schema di risposta simile come BulkExecutor, acquisendo eventuali errori e monitorando l'utilizzo delle unità richiesta.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Dove OperationResponse viene dichiarato come:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Eseguire operazioni contemporaneamente

Per tenere traccia dell'ambito dell'intero elenco di attività, viene usata questa classe helper:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Il metodo ExecuteAsync attenderà il completamento di tutte le operazioni ed è possibile usarlo come segue:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Acquisire statistiche

Il codice precedente attende il completamento di tutte le operazioni e calcola le statistiche necessarie. Queste statistiche sono simili a quelle della libreria bulk executor BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse contiene:

  1. Tempo totale impiegato per elaborare l'elenco delle operazioni tramite il supporto bulk.
  2. Numero di operazioni riuscite.
  3. Totale delle unità richiesta utilizzate.
  4. In caso di errori, viene visualizzato un elenco di tuple che contengono l'eccezione e l'elemento associato per la registrazione e l'identificazione.

Configurazione retry

La libreria di esecuzione bulk ha linee guida indicate per impostare MaxRetryWaitTimeInSeconds e MaxRetryAttemptsOnThrottledRequests di RetryOptions su 0 per delegare il controllo alla libreria.

Per il supporto bulk in .NET SDK, non esiste alcun comportamento nascosto. È possibile configurare le opzioni di retry direttamente tramite CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests e CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Nota

Nei casi in cui le unità richiesta di cui è stato effettuato il provisioning sono molto inferiori al previsto in base alla quantità di dati, è consigliabile impostare su elevati questi valori. L'operazione bulk richiederà più tempo, ma ha una maggiore probabilità di successo per via dei retry più elevati.

Miglioramenti delle prestazioni

Come per altre operazioni con .NET SDK, l'uso delle API di flusso comporta prestazioni migliori ed evita qualsiasi serializzazione non necessaria.

L'uso delle API di flusso è possibile solo se la natura dei dati usati corrisponde a quella di un flusso di byte (ad esempio, flussi di file). In questi casi, l'uso dei metodi CreateItemStreamAsync, ReplaceItemStreamAsync o DeleteItemStreamAsync e l'uso di ResponseMessage (invece di ItemResponse) aumenta la velocità effettiva che è possibile ottenere.

Passaggi successivi