Migrar da biblioteca de executores em massa para o suporte em massa no SDK do Azure Cosmos DB .NET V3
APLICA-SE A: NoSQL
Este artigo descreve as etapas necessárias para migrar o código de um aplicativo existente que usa a biblioteca de executores em massa do .NET para o recurso de suporte em massa na versão mais recente do SDK do .NET.
Habilite o suporte em massa
Habilite o suporte em massa na CosmosClient
instância por meio da configuração AllowBulkExecution :
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Criar tarefas para cada operação
O suporte em massa no SDK do .NET funciona aproveitando a Biblioteca Paralela de Tarefas e agrupando operações que ocorrem simultaneamente.
Não há um único método no SDK que tomará sua lista de documentos ou operações como um parâmetro de entrada, mas sim, você precisa criar uma tarefa para cada operação que deseja executar em massa e, em seguida, simplesmente esperar que eles sejam concluídos.
Por exemplo, se a entrada inicial for uma lista de itens em que cada item tem o seguinte esquema:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Se você quiser fazer a importação em massa (semelhante ao uso de BulkExecutor.BulkImportAsync), precisará ter chamadas simultâneas para CreateItemAsync
. Por exemplo:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Se você quiser fazer a atualização em massa (semelhante ao uso de BulkExecutor.BulkUpdateAsync), você precisa ter chamadas simultâneas para o ReplaceItemAsync
método depois de atualizar o valor do item. Por exemplo:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
E se você quiser fazer a exclusão em massa (semelhante ao uso de BulkExecutor.BulkDeleteAsync), você precisa ter chamadas simultâneas para DeleteItemAsync
, com a id
chave e partição de cada item. Por exemplo:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Capturar o estado do resultado da tarefa
Nos exemplos de código anteriores, criamos uma lista simultânea de tarefas e chamamos o CaptureOperationResponse
método em cada uma dessas tarefas. Este método é uma extensão que nos permite manter um esquema de resposta semelhante ao BulkExecutor, capturando quaisquer erros e rastreando o uso das unidades de solicitação.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Se o for declarado OperationResponse
como:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Executar operações simultaneamente
Para controlar o escopo de toda a lista de Tarefas, usamos esta classe auxiliar:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
O ExecuteAsync
método aguardará até que todas as operações sejam concluídas e você poderá usá-lo assim:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Capturar estatísticas
O código anterior aguarda até que todas as operações sejam concluídas e calcula as estatísticas necessárias. Essas estatísticas são semelhantes às da biblioteca de executores em massa BulkImportResponse.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
O BulkOperationResponse
contém:
- O tempo total necessário para processar a lista de operações através do suporte em massa.
- O número de operações bem-sucedidas.
- O total de unidades de solicitação consumidas.
- Se houver falhas, ele exibirá uma lista de tuplas que contêm a exceção e o item associado para fins de registro e identificação.
Repetir configuração
A biblioteca executora em massa tinha orientações mencionadas para definir o MaxRetryWaitTimeInSeconds
e MaxRetryAttemptsOnThrottledRequests
de RetryOptions para 0
delegar o controle à biblioteca.
Para suporte em massa no SDK do .NET, não há nenhum comportamento oculto. Você pode configurar as opções de repetição diretamente através de CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests e CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Nota
Nos casos em que as unidades de solicitação provisionadas são muito menores do que o esperado com base na quantidade de dados, convém considerar defini-las como valores altos. A operação em massa levará mais tempo, mas tem uma chance maior de ter sucesso completo devido às tentativas mais altas.
Melhoramentos de desempenho
Como em outras operações com o SDK do .NET, o uso das APIs de fluxo resulta em melhor desempenho e evita qualquer serialização desnecessária.
O uso de APIs de fluxo só é possível se a natureza dos dados usados corresponder à de um fluxo de bytes (por exemplo, fluxos de arquivos). Nesses casos, usar os CreateItemStreamAsync
métodos , ReplaceItemStreamAsync
ou DeleteItemStreamAsync
e trabalhar com ResponseMessage
(em vez de) aumenta a taxa de ItemResponse
transferência que pode ser alcançada.
Próximos passos
- Para saber mais sobre as versões do SDK do .NET, consulte o artigo SDK do Azure Cosmos DB .
- Obtenha o código-fonte completo da migração no GitHub.
- Amostras globais adicionais no GitHub
- Tentando fazer o planejamento de capacidade para uma migração para o Azure Cosmos DB?
- Se tudo o que você sabe é o número de vcores e servidores em seu cluster de banco de dados existente, leia sobre como estimar unidades de solicitação usando vCores ou vCPUs
- Se você souber as taxas de solicitação típicas para sua carga de trabalho de banco de dados atual, leia sobre como estimar unidades de solicitação usando o planejador de capacidade do Azure Cosmos DB