Använda .NET-massexekutorbiblioteket för att utföra massåtgärder i Azure Cosmos DB
GÄLLER FÖR: NoSQL
Kommentar
Massexekutorbiblioteket som beskrivs i den här artikeln underhålls för program som använder .NET SDK 2.x-versionen. För nya program kan du använda massstöd som är direkt tillgängligt med .NET SDK version 3.x och det kräver inget externt bibliotek.
Om du för närvarande använder massexekutorbiblioteket och planerar att migrera till massstöd på den nyare SDK:n använder du stegen i migreringsguiden för att migrera ditt program.
Den här självstudien innehåller instruktioner om hur du använder .NET-biblioteket för masskörning för att importera och uppdatera dokument till en Azure Cosmos DB-container. Mer information om massexekutorbiblioteket och hur det hjälper dig att använda massivt dataflöde och lagring finns i översikten över massexekutorbiblioteket i Azure Cosmos DB. I den här självstudien visas ett .NET-exempelprogram där massimport av slumpmässigt genererade dokument till en Azure Cosmos DB-container. När du har importerat data visar biblioteket hur du kan massuppdatera importerade data genom att ange korrigeringar som åtgärder som ska utföras på specifika dokumentfält.
För närvarande stöds massexekutorbiblioteket endast av Azure Cosmos DB för NoSQL och API för Gremlin-konton. I den här artikeln beskrivs hur du använder .NET-massexekutorbiblioteket med API för NoSQL-konton. Information om hur du använder .NET-massexekutorbiblioteket med API för Gremlin-konton finns i Mata in data i grupp i Azure Cosmos DB för Gremlin med hjälp av ett massexekutorbibliotek.
Förutsättningar
Senaste Visual Studio med Azure-utvecklingsarbetsbelastningen. Du kan komma igång med den kostnadsfria Visual Studio Community IDE. Aktivera arbetsbelastningen Azure-utveckling under Visual Studio-installationen.
Om du inte har någon Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.
Du kan prova Azure Cosmos DB kostnadsfritt utan en Azure-prenumeration. Du kan också installera och använda Azure Cosmos DB-emulatorn för lokal utveckling och testning med
https://localhost:8081
slutpunkten. Primärnyckeln finns i Autentisera begäranden.Skapa ett Azure Cosmos DB för NoSQL-konto med hjälp av stegen som beskrivs i avsnittet Skapa ett Azure Cosmos DB-konto i Snabbstart: Azure Cosmos DB for NoSQL-klientbibliotek för .NET.
Klona exempelprogrammet
Nu ska vi växla till att arbeta med kod genom att ladda ned ett .NET-exempelprogram från GitHub. Det här programmet utför massåtgärder på data som lagras i ditt Azure Cosmos DB-konto. Om du vill klona programmet öppnar du en kommandotolk, navigerar till katalogen där du vill kopiera det och kör följande kommando:
git clone https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started.git
Den klonade lagringsplatsen innehåller två exempel, BulkImportSample och BulkUpdateSample. Du kan öppna något av exempelprogrammen, uppdatera anslutningssträng i App.config-filen med ditt Azure Cosmos DB-kontos anslutningssträng, skapa lösningen och köra den.
BulkImportSample-programmet genererar slumpmässiga dokument och massimporter dem till ditt Azure Cosmos DB-konto. BulkUpdateSample-programmet uppdaterar de importerade dokumenten genom att ange korrigeringar som åtgärder som ska utföras på specifika dokumentfält. I nästa avsnitt granskar du koden i var och en av dessa exempelappar.
Massimportera data till ett Azure Cosmos DB-konto
Gå till mappen BulkImportSample och öppna filen BulkImportSample.sln .
Azure Cosmos DB:s anslutningssträng hämtas från App.config-filen enligt följande kod:
private static readonly string EndpointUrl = ConfigurationManager.AppSettings["EndPointUrl"]; private static readonly string AuthorizationKey = ConfigurationManager.AppSettings["AuthorizationKey"]; private static readonly string DatabaseName = ConfigurationManager.AppSettings["DatabaseName"]; private static readonly string CollectionName = ConfigurationManager.AppSettings["CollectionName"]; private static readonly int CollectionThroughput = int.Parse(ConfigurationManager.AppSettings["CollectionThroughput"]);
Massimportören skapar en ny databas och en container med databasnamnet, containernamnet och dataflödesvärdena som anges i App.config-filen.
Sedan initieras DocumentClient-objektet med Direkt TCP-anslutningsläge:
ConnectionPolicy connectionPolicy = new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp }; DocumentClient client = new DocumentClient(new Uri(endpointUrl),authorizationKey, connectionPolicy)
BulkExecutor-objektet initieras med ett högt återförsöksvärde för väntetid och begränsade begäranden. Sedan är de inställda på 0 för att överföra överbelastningskontroll till BulkExecutor under dess livslängd.
// Set retry options high during initialization (default values). client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9; IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection); await bulkExecutor.InitializeAsync(); // Set retries to 0 to pass complete control to bulk executor. client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0; client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
Programmet anropar BulkImportAsync-API:et. .NET-biblioteket innehåller två överlagringar av API:et för massimport – ett som accepterar en lista över serialiserade JSON-dokument och det andra som accepterar en lista över deserialiserade POCO-dokument. Mer information om definitionerna för var och en av dessa överlagrade metoder finns i API-dokumentationen.
BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync( documents: documentsToImportInBatch, enableUpsert: true, disableAutomaticIdGeneration: true, maxConcurrencyPerPartitionKeyRange: null, maxInMemorySortingBatchSize: null, cancellationToken: token);
BulkImportAsync-metoden accepterar följande parametrar:
Parameter Beskrivning enableUpsert En flagga för att aktivera upsert-åtgärder i dokumenten. Om det redan finns ett dokument med det angivna ID:t uppdateras det. Som standard är den inställd på false. disableAutomaticIdGeneration En flagga för att inaktivera automatisk generering av ID. Som standard är det inställt på sant. maxConcurrencyPerPartitionKeyRange Den maximala graden av samtidighet per partitionsnyckelintervall. Om du anger null används standardvärdet 20 i biblioteket. maxInMemorySortingBatchSize Det maximala antalet dokument som hämtas från dokumentuppräknaren, som skickas till API-anropet i varje steg. För den minnesinterna sorteringsfasen som sker innan massimporten. Om du anger den här parametern till null används standardvärdet för minimum (documents.count, 1000000). cancellationToken Annulleringstoken för att avsluta massimportåtgärden på ett korrekt sätt.
Definition av massimportsvarsobjekt
Resultatet av API-anropet för massimport innehåller följande attribut:
Parameter | Beskrivning |
---|---|
NumberOfDocumentsImporterad (lång) | Det totala antalet dokument som har importerats av det totala antalet dokument som skickats till API-anropet för massimport. |
TotalRequestUnitsConsumed (dubbelt) | Totalt antal enheter för begäran (RU) som förbrukas av API-anropet för massimport. |
TotalTimeTaken (TimeSpan) | Den totala tid det tar för API-anropet för massimport att slutföra körningen. |
BadInputDocuments (listobjekt><) | Listan över dokument i felaktigt format som inte har importerats i API-anropet för massimport. Åtgärda de dokument som returneras och försök importera igen. Ogiltigt formaterade dokument innehåller dokument vars ID-värde inte är en sträng (null eller någon annan datatyp anses vara ogiltig). |
Massuppdatering av data i ditt Azure Cosmos DB-konto
Du kan uppdatera befintliga dokument med hjälp av BulkUpdateAsync-API:et. I det här exemplet anger Name
du fältet till ett nytt värde och tar bort fältet Description
från de befintliga dokumenten. Fullständig uppsättning uppdateringsåtgärder som stöds finns i API-dokumentationen.
Gå till mappen BulkUpdateSample och öppna filen BulkUpdateSample.sln .
Definiera uppdateringsobjekten tillsammans med motsvarande fältuppdateringsåtgärder. I det här exemplet använder du SetUpdateOperation för att uppdatera
Name
fältet och UnsetUpdateOperation för att ta bort fältetDescription
från alla dokument. Du kan också utföra andra åtgärder som att öka ett dokumentfält med ett specifikt värde, push-överföra specifika värden till ett matrisfält eller ta bort ett specifikt värde från ett matrisfält. Mer information om olika metoder som tillhandahålls av massuppdaterings-API:et finns i API-dokumentationen.SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc"); UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description"); List<UpdateOperation> updateOperations = new List<UpdateOperation>(); updateOperations.Add(nameUpdate); updateOperations.Add(descriptionUpdate); List<UpdateItem> updateItems = new List<UpdateItem>(); for (int i = 0; i < 10; i++) { updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations)); }
Programmet anropar BulkUpdateAsync-API:et. Mer information om definitionen av metoden BulkUpdateAsync finns i API-dokumentationen.
BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync( updateItems: updateItems, maxConcurrencyPerPartitionKeyRange: null, maxInMemorySortingBatchSize: null, cancellationToken: token);
BulkUpdateAsync-metoden accepterar följande parametrar:
Parameter Beskrivning maxConcurrencyPerPartitionKeyRange Den maximala graden av samtidighet per partitionsnyckelintervall. Om den här parametern anges till null använder biblioteket standardvärdet(20). maxInMemorySortingBatchSize Det maximala antalet uppdateringsobjekt som hämtas från uppräknaren för uppdateringsobjekt som skickas till API-anropet i varje steg. För den minnesinterna sorteringsfasen som sker innan massuppdateringen. Om den här parametern ställs in på null används standardvärdet för biblioteket (updateItems.count, 1000000). cancellationToken Annulleringstoken för att avsluta massuppdateringsåtgärden på ett korrekt sätt.
Definition av massuppdateringssvarsobjekt
Resultatet av API-anropet för massuppdatering innehåller följande attribut:
Parameter | Beskrivning |
---|---|
NumberOfDocumentsUpdated (lång) | Antalet dokument som har uppdaterats av det totala antalet dokument som har skickats till API-anropet för massuppdatering. |
TotalRequestUnitsConsumed (dubbelt) | Det totala antalet enheter för programbegäran (RU:er) som förbrukas av API-anropet för massuppdatering. |
TotalTimeTaken (TimeSpan) | Den totala tid det tar för API-anropet för massuppdatering att slutföra körningen. |
Prestandatips
Överväg följande punkter för bättre prestanda när du använder massexekutorbiblioteket:
För bästa prestanda kör du ditt program från en virtuell Azure-dator som finns i samma region som ditt Azure Cosmos DB-kontos skrivregion.
Vi rekommenderar att du instansierar ett enda BulkExecutor-objekt för hela programmet på en enda virtuell dator som motsvarar en specifik Azure Cosmos DB-container.
En enda massåtgärds-API-körning förbrukar en stor del av klientdatorns CPU och nätverks-I/O när flera uppgifter skapas internt. Undvik att skapa flera samtidiga uppgifter i din programprocess som kör API-anrop för massåtgärder. Om ett API-anrop med en enda massåtgärd som körs på en enda virtuell dator inte kan förbruka hela containerns dataflöde (om containerns dataflöde > är 1 miljon RU/s) föredrar vi att skapa separata virtuella datorer för att köra API-anropen för massåtgärder samtidigt.
InitializeAsync()
Kontrollera att metoden anropas efter instansiering av ett BulkExecutor-objekt för att hämta azure Cosmos DB-målcontainerns partitionskarta.Kontrollera att gcServer är aktiverat för bättre prestanda i programmets App.Config
<runtime> <gcServer enabled="true" /> </runtime>
Biblioteket genererar spårningar som kan samlas in i en loggfil eller i -konsolen. Om du vill aktivera båda lägger du till följande kod i programmets App.Config-fil :
<system.diagnostics> <trace autoflush="false" indentsize="4"> <listeners> <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" /> <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" /> </listeners> </trace> </system.diagnostics>