Migrace z knihovny Bulk Executor na hromadnou podporu v sadě .NET SDK služby Azure Cosmos DB v3
PLATÍ PRO: NoSQL
Tento článek popisuje požadované kroky k migraci kódu existující aplikace, který používá knihovnu bulk Executor .NET k funkci hromadné podpory v nejnovější verzi sady .NET SDK.
Povolení hromadné podpory
Povolení hromadné podpory instance CosmosClient
prostřednictvím konfigurace AllowBulkExecution :
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Vytvoření úkolů pro každou operaci
Hromadná podpora v sadě .NET SDK funguje s využitím paralelní knihovny úloh a operací seskupování, ke kterým dochází souběžně.
Sada SDK neobsahuje jedinou metodu, která jako vstupní parametr vezme seznam dokumentů nebo operací, ale potřebujete vytvořit úlohu pro každou operaci, kterou chcete hromadně spustit, a pak jednoduše počkat, až se dokončí.
Pokud je například počáteční vstup seznamem položek, ve kterých má každá položka následující schéma:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Pokud chcete provést hromadný import (podobně jako bulkExecutor.BulkImportAsync), musíte mít souběžná volání CreateItemAsync
. Příklad:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Pokud chcete provést hromadnou aktualizaci (podobně jako pomocí BulkExecutor.BulkUpdateAsync), musíte mít souběžná volání metody ReplaceItemAsync
po aktualizaci hodnoty položky. Příklad:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
A pokud chcete provést hromadné odstranění (podobně jako bulkExecutor.BulkDeleteAsync), musíte mít souběžná volání DeleteItemAsync
, s klíčem oddílu id
každé položky. Příklad:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Zaznamenat stav výsledku úkolu
V předchozích příkladech kódu jsme vytvořili souběžný seznam úkolů a pro každou z těchto úloh jsme volali metodu CaptureOperationResponse
. Tato metoda je rozšíření, které nám umožňuje udržovat podobné schéma odpovědí jako BulkExecutor tím, že zachytává všechny chyby a sleduje využití jednotek žádostí.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
OperationResponse
Kde je deklarován jako:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Souběžné spouštění operací
Ke sledování rozsahu celého seznamu úkolů používáme tuto pomocnou třídu:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Metoda ExecuteAsync
počká, dokud se nedokončí všechny operace, a můžete ji použít takto:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Zachytávání statistik
Předchozí kód čeká na dokončení všech operací a vypočítá požadované statistiky. Tyto statistiky jsou podobné této knihovně BulkImportResponse knihovny BulkImportResponse.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
Obsahuje BulkOperationResponse
:
- Celková doba potřebná ke zpracování seznamu operací prostřednictvím hromadné podpory.
- Počet úspěšných operací.
- Celkový počet spotřebovaných jednotek žádostí
- Pokud dojde k selhání, zobrazí se seznam řazených kolekcí členů, které obsahují výjimku, a přidruženou položku pro účely protokolování a identifikace.
Konfigurace opakování
Knihovna Bulk Executor obsahovala pokyny, které se zmínily o nastavení MaxRetryWaitTimeInSeconds
a MaxRetryAttemptsOnThrottledRequests
opakování, aby 0
delegovala řízení do knihovny.
Pro hromadnou podporu v sadě .NET SDK neexistuje žádné skryté chování. Možnosti opakování můžete nakonfigurovat přímo prostřednictvím CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests a CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Poznámka:
V případech, kdy jsou zřízené jednotky žádostí mnohem nižší než očekávané na základě množství dat, můžete zvážit nastavení těchto jednotek na vysoké hodnoty. Hromadná operace bude trvat déle, ale má větší šanci na to, že úspěšně proběhne kvůli vyšším opakovaným pokusům.
Zlepšení výkonu
Stejně jako u jiných operací se sadou .NET SDK má použití rozhraní API datových proudů za následek lepší výkon a vyhnout se zbytečnému serializaci.
Použití rozhraní API streamu je možné pouze v případě, že povaha dat, která používáte, odpovídá datovému proudu bajtů (například datových proudů souborů). Vtakovýchch CreateItemStreamAsync
ReplaceItemStreamAsync
DeleteItemStreamAsync
ResponseMessage
ItemResponse
Další kroky
- Další informace o vydaných verzích sady .NET SDK najdete v článku o sadě SDK služby Azure Cosmos DB.
- Získejte úplný zdrojový kód migrace z GitHubu.
- Další hromadné ukázky na GitHubu
- Pokoušíte se naplánovat kapacitu migrace do služby Azure Cosmos DB?
- Pokud víte, že je počet virtuálních jader a serverů ve vašem existujícím databázovém clusteru, přečtěte si o odhadu jednotek žádostí pomocí virtuálních jader nebo virtuálních procesorů.
- Pokud znáte typické sazby požadavků pro vaši aktuální úlohu databáze, přečtěte si informace o odhadu jednotek žádostí pomocí plánovače kapacity služby Azure Cosmos DB.