Migrera från massexekutorbiblioteket till masssupporten i Azure Cosmos DB .NET V3 SDK
GÄLLER FÖR: NoSQL
Den här artikeln beskriver de steg som krävs för att migrera ett befintligt programs kod som använder .NET-massexekutorbiblioteket till masssupportfunktionen i den senaste versionen av .NET SDK.
Aktivera massstöd
Aktivera massstöd för instansen CosmosClient
via AllowBulkExecution-konfigurationen :
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Skapa uppgifter för varje åtgärd
Massstöd i .NET SDK fungerar genom att använda det parallella aktivitetsbiblioteket och grupperingsåtgärder som utförs samtidigt.
Det finns ingen enskild metod i SDK som tar din lista över dokument eller åtgärder som en indataparameter, utan du behöver skapa en aktivitet för varje åtgärd som du vill köra i grupp och sedan bara vänta tills de har slutförts.
Om dina första indata till exempel är en lista över objekt där varje objekt har följande schema:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Om du vill massimportera (liknar att använda BulkExecutor.BulkImportAsync) måste du ha samtidiga anrop till CreateItemAsync
. Till exempel:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Om du vill göra massuppdateringar (ungefär som med BulkExecutor.BulkUpdateAsync) måste du ha samtidiga anrop till ReplaceItemAsync
metoden när du har uppdaterat objektvärdet. Till exempel:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
Och om du vill göra massborttagning (liknar att använda BulkExecutor.BulkDeleteAsync) måste du ha samtidiga anrop till DeleteItemAsync
, med partitionsnyckeln id
och för varje objekt. Till exempel:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Resultattillstånd för avbildningsaktivitet
I föregående kodexempel har vi skapat en samtidig lista över uppgifter och anropat CaptureOperationResponse
metoden för var och en av dessa uppgifter. Den här metoden är ett tillägg som gör att vi kan upprätthålla ett liknande svarsschema som BulkExecutor genom att samla in eventuella fel och spåra användningen av enheter för begäranden.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
OperationResponse
Om deklareras som:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Köra åtgärder samtidigt
För att spåra omfånget för hela listan med uppgifter använder vi den här hjälpklassen:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Metoden ExecuteAsync
väntar tills alla åtgärder har slutförts och du kan använda den så här:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Samla in statistik
Den tidigare koden väntar tills alla åtgärder har slutförts och beräknar den statistik som krävs. Den här statistiken liknar den för massexekutorbibliotekets BulkImportResponse.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
Innehåller BulkOperationResponse
:
- Den totala tid det tar att bearbeta listan över åtgärder via massstöd.
- Antalet lyckade åtgärder.
- Det totala antalet förbrukade enheter för begäran.
- Om det uppstår fel visas en lista över tupplar som innehåller undantaget och det associerade objektet för loggning och identifiering.
Försök konfigurera igen
Massexekutorbiblioteket hade vägledning som nämndes för att ange MaxRetryWaitTimeInSeconds
och MaxRetryAttemptsOnThrottledRequests
för RetryOptions för att 0
delegera kontroll till biblioteket.
För massstöd i .NET SDK finns det inget dolt beteende. Du kan konfigurera återförsöksalternativen direkt via CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests och CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Kommentar
Om enheterna för etablerade begäranden är mycket lägre än förväntat baserat på mängden data kan du överväga att ställa in dessa till höga värden. Massåtgärden tar längre tid, men det har större chans att lyckas helt på grund av de högre återförsöken.
Prestandaförbättringar
Precis som med andra åtgärder med .NET SDK ger användning av stream-API:er bättre prestanda och undviker onödig serialisering.
Det går bara att använda stream-API:er om typen av data som du använder matchar dataströmmen med byte (till exempel filströmmar). I sådana fall ökar användningen av CreateItemStreamAsync
metoderna , ReplaceItemStreamAsync
eller DeleteItemStreamAsync
och arbetar med ResponseMessage
(i stället för ItemResponse
) det dataflöde som kan uppnås.
Nästa steg
- Mer information om .NET SDK-versionerna finns i artikeln Azure Cosmos DB SDK .
- Hämta den fullständiga källkoden för migrering från GitHub.
- Ytterligare massexempel på GitHub
- Försöker du planera kapacitet för en migrering till Azure Cosmos DB?
- Om allt du vet är antalet virtuella kärnor och servrar i ditt befintliga databaskluster läser du om att uppskatta enheter för begäranden med virtuella kärnor eller virtuella kärnor
- Om du känner till vanliga begärandefrekvenser för din aktuella databasarbetsbelastning kan du läsa om att uppskatta enheter för begäranden med azure Cosmos DB-kapacitetshanteraren