你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

从批量执行工具库迁移到 Azure Cosmos DB .NET V3 SDK 中的批量操作支持

适用范围: NoSQL

本文介绍需要执行哪些步骤,才能将使用 .NET 批量执行工具库的现有应用程序的代码迁移到使用最新版 .NET SDK 中的批量操作支持功能。

启用批量操作支持

通过 AllowBulkExecution 配置在 CosmosClient 实例上启用批量操作支持:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

为每个操作创建任务

通过利用任务并行库,并将并发执行的操作分组,.NET SDK 中的批量操作支持得以发挥作用。

SDK 中没有任何一种方法可将文档或操作的列表用作输入参数,你需要针对要批量执行的每个操作创建一个任务,然后等待它们完成。

例如,如果初始输入是一个项列表,其中的每个项采用以下架构:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

若要执行批量导入(类似于使用 BulkExecutor.BulkImportAsync),需要对 CreateItemAsync 发出并发调用。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

若要执行批量更新(类似于使用 BulkExecutor.BulkUpdateAsync),则在更新项值后,需要对 ReplaceItemAsync 方法发出并发调用。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

若要执行批量删除(类似于使用 BulkExecutor.BulkDeleteAsync),需要使用每个项的 id 和分区键对 DeleteItemAsync 发出并发调用。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

捕获任务结果状态

在前面的代码示例中,我们已创建一个并发任务列表,并对其中的每个任务调用了 CaptureOperationResponse 方法。 此方法是一个扩展,可让我们通过捕获任何错误并跟踪请求单位用量来保持与 BulkExecutor 类似的响应架构。

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

其中,OperationResponse 声明为:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

并发执行操作

为了跟踪整个 Tasks 列表的作用域,我们使用以下帮助程序类:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

ExecuteAsync 方法会等待所有操作完成,你可以像这样使用它:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

捕获统计信息

以上代码将等到所有操作完成,然后计算所需的统计信息。 这些统计信息类似于批量执行工具库的 BulkImportResponse 的统计信息。

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse 包含:

  1. 通过批量操作支持处理操作列表所用的总时间。
  2. 成功的操作数目。
  3. 消耗的请求单位总数。
  4. 如果发生失败,它会显示一个元组列表,其中包含了异常和关联的项,以用于日志记录和问题识别目的。

重试配置

批量执行工具库提供了指导,其中指出,需要将 RetryOptionsMaxRetryWaitTimeInSecondsMaxRetryAttemptsOnThrottledRequests 设置为 0,以将控制权委托给该库。

对于 .NET SDK 中的批量操作支持,不存在隐藏的行为。 可以直接通过 CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequestsCosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests 配置重试选项。

注意

如果从数据量来看,预配的请求单位数远远低于预期数量,则你可能需要考虑将这些参数设置为较高的值。 批量操作会花费更长时间,但由于重试次数较高,该操作成功完成的可能性较高。

性能改进

与 .NET SDK 中的其他操作一样,使用流 API 可以提高性能,并避免任何不必要的序列化。

仅当所用数据的性质与字节流(例如文件流)的性质匹配时,才可以使用流 API。 在这种情况下,使用 CreateItemStreamAsyncReplaceItemStreamAsyncDeleteItemStreamAsync 方法并使用 ResponseMessage(而不是 ItemResponse)可以增加可实现的吞吐量。

后续步骤