Eseguire la migrazione dalla libreria di esecuzione bulk al supporto bulk in Azure Cosmos DB .NET V3 SDK
SI APPLICA A: NoSQL
Questo articolo descrive i passaggi necessari per eseguire la migrazione del codice di un'applicazione esistente che usa la libreria di esecuzione bulk .NET alla funzionalità supporto bulk nella versione più recente di .NET SDK.
Abilitare il supporto bulk
Abilitare il supporto bulk nell'istanza di CosmosClient
tramite la configurazione di AllowBulkExecution:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Creare attività per ogni operazione
Il supporto bulk in .NET SDK funziona sfruttando le operazioni di Task Parallel Library e raggruppamento che si verificano simultaneamente.
Non esiste un singolo metodo nell'SDK che accetti l'elenco di documenti o operazioni come parametro di input, ma è invece necessario creare un'attività per ogni operazione che si vuole eseguire in bulk e quindi attenderne semplicemente il completamento.
Ad esempio, se l'input iniziale è un elenco di elementi in cui ogni elemento ha lo schema seguente:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Se si vuole eseguire l'importazione bulk (simile all'uso di BulkExecutor.BulkImportAsync), è necessario disporre di chiamate simultanee verso CreateItemAsync
. Ad esempio:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Se si desidera eseguire l'aggiornamento bulk (simile all'uso di BulkExecutor.BulkUpdateAsync), è necessario disporre di chiamate simultanee al metodo ReplaceItemAsync
dopo l'aggiornamento del valore dell'elemento. Ad esempio:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
E se si desidera eseguire l'eliminazione bulk (analogamente all'uso di BulkExecutor.BulkDeleteAsync), è necessario avere chiamate simultanee a DeleteItemAsync
, con id
e la chiave di partizione di ogni elemento. Ad esempio:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Acquisire lo stato del risultato dell'attività
Negli esempi di codice precedenti è stato creato un elenco simultaneo di attività e viene chiamato il metodo CaptureOperationResponse
in ognuna di queste attività. Questo metodo è un'estensione che consente di mantenere uno schema di risposta simile come BulkExecutor, acquisendo eventuali errori e monitorando l'utilizzo delle unità richiesta.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Dove OperationResponse
viene dichiarato come:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Eseguire operazioni contemporaneamente
Per tenere traccia dell'ambito dell'intero elenco di attività, viene usata questa classe helper:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Il metodo ExecuteAsync
attenderà il completamento di tutte le operazioni ed è possibile usarlo come segue:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Acquisire statistiche
Il codice precedente attende il completamento di tutte le operazioni e calcola le statistiche necessarie. Queste statistiche sono simili a quelle della libreria bulk executor BulkImportResponse.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
BulkOperationResponse
contiene:
- Tempo totale impiegato per elaborare l'elenco delle operazioni tramite il supporto bulk.
- Numero di operazioni riuscite.
- Totale delle unità richiesta utilizzate.
- In caso di errori, viene visualizzato un elenco di tuple che contengono l'eccezione e l'elemento associato per la registrazione e l'identificazione.
Configurazione retry
La libreria di esecuzione bulk ha linee guida indicate per impostare MaxRetryWaitTimeInSeconds
e MaxRetryAttemptsOnThrottledRequests
di RetryOptions su 0
per delegare il controllo alla libreria.
Per il supporto bulk in .NET SDK, non esiste alcun comportamento nascosto. È possibile configurare le opzioni di retry direttamente tramite CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests e CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Nota
Nei casi in cui le unità richiesta di cui è stato effettuato il provisioning sono molto inferiori al previsto in base alla quantità di dati, è consigliabile impostare su elevati questi valori. L'operazione bulk richiederà più tempo, ma ha una maggiore probabilità di successo per via dei retry più elevati.
Miglioramenti delle prestazioni
Come per altre operazioni con .NET SDK, l'uso delle API di flusso comporta prestazioni migliori ed evita qualsiasi serializzazione non necessaria.
L'uso delle API di flusso è possibile solo se la natura dei dati usati corrisponde a quella di un flusso di byte (ad esempio, flussi di file). In questi casi, l'uso dei metodi CreateItemStreamAsync
, ReplaceItemStreamAsync
o DeleteItemStreamAsync
e l'uso di ResponseMessage
(invece di ItemResponse
) aumenta la velocità effettiva che è possibile ottenere.
Passaggi successivi
- Per altre informazioni sulle versioni di .NET SDK, vedere l'articolo Azure Cosmos DB SDK.
- Ottenere il codice sorgente completo della migrazione da GitHub.
- Altri esempi bulk in GitHub
- Si sta tentando di pianificare la capacità per una migrazione ad Azure Cosmos DB?
- Se si conosce solo il numero di vcore e server nel cluster di database esistente, leggere le informazioni sulla stima delle unità richieste usando vCore o vCPU
- Se si conosce la frequenza delle richieste tipiche per il carico di lavoro corrente del database, leggere le informazioni sulla stima delle unità richieste con lo strumento di pianificazione della capacità di Azure Cosmos DB