Compartir vía


Migración de la biblioteca Bulk Executor a la compatibilidad con la ejecución en bloque del SDK para .NET v3 de Azure Cosmos DB

SE APLICA A: NoSQL

En este artículo se describen los pasos necesarios para migrar el código de una aplicación existente que usa la biblioteca Bulk Executor de .NET a la característica de compatibilidad con la ejecución en bloque en la versión más reciente del SDK para .NET.

Habilitar la compatibilidad con la ejecución en bloque

Habilite la compatibilidad con la ejecución en bloque en la instancia de CosmosClient mediante la configuración de AllowBulkExecution:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Creación de tareas para cada operación

La compatibilidad con la ejecución en bloque del SDK para .NET funciona mediante el uso de la biblioteca TPL y las operaciones de agrupación que se producen simultáneamente.

No hay ningún método único en el SDK que use su lista de documentos u operaciones como parámetro de entrada, sino que deberá crear una tarea para cada operación que quiera ejecutar de forma masiva. A continuación, simplemente, espere a que se completen.

Por ejemplo, si la entrada inicial es una lista de elementos en que cada elemento tiene el siguiente esquema:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Si quiere realizar una importación masiva (una opción similar a usar BulkExecutor.BulkImportAsync), debe tener llamadas simultáneas a CreateItemAsync. Por ejemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Si quiere realizar una actualización en bloque (una opción similar a usar BulkExecutor.BulkUpdateAsync), necesita tener llamadas simultáneas al método ReplaceItemAsync después de actualizar el valor del elemento. Por ejemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Y, si quiere realizar una eliminación en bloque (una opción similar a usar BulkExecutor.BulkDeleteAsync), necesita tener llamadas simultáneas a DeleteItemAsync, con id y la clave de partición de cada elemento. Por ejemplo:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Captura del estado del resultado de la tarea

En los ejemplos de código anteriores, hemos creado una lista de tareas simultáneas y ha llamado al método CaptureOperationResponse en cada una de esas tareas. Este método es una extensión que nos permite mantener un esquema de respuesta similar como BulkExecutor mediante la captura de los errores y el seguimiento del uso de las unidades de solicitud.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Donde OperationResponse se declara como:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Ejecución de operaciones simultáneas

Para realizar el seguimiento del ámbito de toda la lista de tareas, usamos esta clase auxiliar:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

El método ExecuteAsync esperará hasta que se completen todas las operaciones y se pueda usar de la siguiente manera:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Captura de estadísticas

El código anterior espera hasta que se completan todas las operaciones y calcula las estadísticas necesarias. Estas estadísticas son similares a las del elemento BulkImportResponse de la biblioteca Bulk Executor.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse contiene:

  1. El tiempo total que se tarda en procesar la lista de operaciones mediante la compatibilidad con la ejecución en bloque.
  2. El número de operaciones completadas correctamente.
  3. El total de unidades de la solicitud consumidas.
  4. Si hay errores, muestra una lista de tuplas que contienen la excepción y el elemento asociado para fines de registro e identificación.

Configuración de reintento

La biblioteca Bulk Executor tenía una guía que mencionaba que se debían establecer los valores de MaxRetryWaitTimeInSeconds y MaxRetryAttemptsOnThrottledRequests de RetryOptions en 0 para delegar el control a la biblioteca.

Para la compatibilidad con la ejecución en bloque del SDK para .NET, no hay ningún comportamiento oculto. Puede configurar las opciones de reintento directamente mediante CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests y CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Nota:

En los casos en los que las unidades de solicitud aprovisionadas están muy por debajo de las esperadas según la cantidad de datos, considere la posibilidad de definir valores altos para estas opciones. La operación en bloque tardará más, pero tendrá una mayor probabilidad de completarse correctamente debido a un valor de reintentos mayor.

Mejoras en el rendimiento

Del mismo modo que con otras operaciones con el SDK para .NET, el uso de API de flujo da como resultado un mejor rendimiento y evita cualquier serialización innecesaria.

El uso de API de flujo solo es posible si la naturaleza de los datos que usa coincide con la de un flujo de bytes (por ejemplo, flujos de archivos). En tales casos, el uso de los métodos CreateItemStreamAsync, ReplaceItemStreamAsync o DeleteItemStreamAsync y el trabajo con ResponseMessage (en lugar de ItemResponse) aumenta el rendimiento que se puede conseguir.

Pasos siguientes