Dela via


Migrera från massexekutorbiblioteket till masssupporten i Azure Cosmos DB .NET V3 SDK

GÄLLER FÖR: NoSQL

Den här artikeln beskriver de steg som krävs för att migrera ett befintligt programs kod som använder .NET-massexekutorbiblioteket till masssupportfunktionen i den senaste versionen av .NET SDK.

Aktivera massstöd

Aktivera massstöd för instansen CosmosClient via AllowBulkExecution-konfigurationen :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Skapa uppgifter för varje åtgärd

Massstöd i .NET SDK fungerar genom att använda det parallella aktivitetsbiblioteket och grupperingsåtgärder som utförs samtidigt.

Det finns ingen enskild metod i SDK som tar din lista över dokument eller åtgärder som en indataparameter, utan du behöver skapa en aktivitet för varje åtgärd som du vill köra i grupp och sedan bara vänta tills de har slutförts.

Om dina första indata till exempel är en lista över objekt där varje objekt har följande schema:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Om du vill massimportera (liknar att använda BulkExecutor.BulkImportAsync) måste du ha samtidiga anrop till CreateItemAsync. Till exempel:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Om du vill göra massuppdateringar (ungefär som med BulkExecutor.BulkUpdateAsync) måste du ha samtidiga anrop till ReplaceItemAsync metoden när du har uppdaterat objektvärdet. Till exempel:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Och om du vill göra massborttagning (liknar att använda BulkExecutor.BulkDeleteAsync) måste du ha samtidiga anrop till DeleteItemAsync, med partitionsnyckeln id och för varje objekt. Till exempel:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Resultattillstånd för avbildningsaktivitet

I föregående kodexempel har vi skapat en samtidig lista över uppgifter och anropat CaptureOperationResponse metoden för var och en av dessa uppgifter. Den här metoden är ett tillägg som gör att vi kan upprätthålla ett liknande svarsschema som BulkExecutor genom att samla in eventuella fel och spåra användningen av enheter för begäranden.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

OperationResponse Om deklareras som:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Köra åtgärder samtidigt

För att spåra omfånget för hela listan med uppgifter använder vi den här hjälpklassen:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Metoden ExecuteAsync väntar tills alla åtgärder har slutförts och du kan använda den så här:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Samla in statistik

Den tidigare koden väntar tills alla åtgärder har slutförts och beräknar den statistik som krävs. Den här statistiken liknar den för massexekutorbibliotekets BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

Innehåller BulkOperationResponse :

  1. Den totala tid det tar att bearbeta listan över åtgärder via massstöd.
  2. Antalet lyckade åtgärder.
  3. Det totala antalet förbrukade enheter för begäran.
  4. Om det uppstår fel visas en lista över tupplar som innehåller undantaget och det associerade objektet för loggning och identifiering.

Försök konfigurera igen

Massexekutorbiblioteket hade vägledning som nämndes för att ange MaxRetryWaitTimeInSeconds och MaxRetryAttemptsOnThrottledRequests för RetryOptions för att 0 delegera kontroll till biblioteket.

För massstöd i .NET SDK finns det inget dolt beteende. Du kan konfigurera återförsöksalternativen direkt via CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests och CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Kommentar

Om enheterna för etablerade begäranden är mycket lägre än förväntat baserat på mängden data kan du överväga att ställa in dessa till höga värden. Massåtgärden tar längre tid, men det har större chans att lyckas helt på grund av de högre återförsöken.

Prestandaförbättringar

Precis som med andra åtgärder med .NET SDK ger användning av stream-API:er bättre prestanda och undviker onödig serialisering.

Det går bara att använda stream-API:er om typen av data som du använder matchar dataströmmen med byte (till exempel filströmmar). I sådana fall ökar användningen av CreateItemStreamAsyncmetoderna , ReplaceItemStreamAsynceller DeleteItemStreamAsync och arbetar med ResponseMessage (i stället för ItemResponse) det dataflöde som kan uppnås.

Nästa steg