Delen via


Gegevens opnemen met behulp van de Kusto .NET SDK

Er zijn twee clientbibliotheken voor .NET: een opnamebibliotheek en een gegevensbibliotheek. Zie over .NET SDK voor meer informatie over .NET SDK. Met deze bibliotheken kunt u gegevens opnemen (laden) in een cluster en gegevens bevragen vanuit uw code. In dit artikel maakt u eerst een tabel en gegevenstoewijzing in een testcluster. Vervolgens plaatst u een gegevensopname in het cluster in de wachtrij en valideert u de resultaten.

Vereisten

  • Een Microsoft-account of een Microsoft Entra gebruikersidentiteit. Er is geen Azure-abonnement vereist.
  • Een cluster en database. Maak een cluster en database.

De opnamebibliotheek installeren

Install-Package Microsoft.Azure.Kusto.Ingest

Verificatie toevoegen en verbindingsreeks maken

Verificatie

Voor het verifiëren van een toepassing gebruikt de SDK uw Microsoft Entra tenant-id. Om uw tenant-id te vinden, gebruikt u de volgende URL, waarbij u YourDomain vervangt door uw domeinnaam.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Dus als uw domein contoso.com is, wordt de URL bijvoorbeeld: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klik op deze URL om de resultaten weer te geven. De eerste regel is als volgt.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

De tenant-id is in dit geval 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

In dit voorbeeld wordt een interactieve Microsoft Entra gebruikersverificatie gebruikt om toegang te krijgen tot het cluster. U kunt ook Microsoft Entra toepassingsverificatie gebruiken met certificaat of toepassingsgeheim. Zorg ervoor dat u de juiste waarden voor tenantId en clusterUri instelt voordat u deze code uitvoert.

De SDK biedt een handige manier om de verificatiemethode in te stellen als onderdeel van de verbindingsreeks. Zie verbindingsreeksen voor de volledige documentatie over verbindingsreeksen.

Notitie

De huidige versie van de SDK biedt geen ondersteuning voor interactieve gebruikersverificatie op .NET Core. Gebruik indien nodig Microsoft Entra gebruikersnaam/wachtwoord of toepassingsverificatie.

De verbindingsreeks samenstellen

U kunt nu de verbindingsreeks maken. U maakt de doeltabel en toewijzing in een latere stap.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Gegevens van bronbestand instellen

Stel het pad in voor het bronbestand. In dit voorbeeld wordt een voorbeeldbestand gebruikt dat wordt gehost in Azure Blob Storage. De StormEvents-voorbeeldgegevensset bevat weergerelateerde gegevens van de National Centers for Environmental Information.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Een tabel maken in het testcluster

Maak een tabel met de naam StormEvents die overeenkomt met het schema van de gegevens in het bestand StormEvents.csv.

Tip

Met de volgende codefragmenten wordt voor bijna elke aanroep een exemplaar van een client gemaakt. Dit wordt gedaan om elk fragment afzonderlijk uitvoerbaar te maken. In productie worden de clientexemplaren opnieuw gebruikt en moeten ze zo lang worden bewaard als nodig is. Eén clientexemplaren per URI zijn voldoende, zelfs wanneer u met meerdere databases werkt (database kan worden opgegeven op opdrachtniveau).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Toewijzing van opname definiëren

Wijs de binnenkomende CSV-gegevens toe aan de kolomnamen die zijn gebruikt bij het maken van de tabel. Richt een CSV-kolomtoewijzingsobject in die tabel in.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Batchbeleid voor uw tabel definiëren

Door binnenkomende gegevens in batches te batchen, wordt de shardgrootte van de gegevens geoptimaliseerd, die wordt beheerd door het beleid voor opnamebatches. Wijzig het beleid met de opdracht voor het beheer van het opnamebatchbeleid. Gebruik dit beleid om de latentie van langzaam binnenkomende gegevens te verminderen.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

We raden u aan een Raw Data Size waarde te definiëren voor opgenomen gegevens en de grootte stapsgewijs te verkleinen naar 250 MB, terwijl u controleert of de prestaties verbeteren.

U kunt de Flush Immediately eigenschap gebruiken om batchverwerking over te slaan, hoewel dit niet wordt aanbevolen voor grootschalige opname, omdat dit slechte prestaties kan veroorzaken.

Bericht in wachtrij zetten voor opname

Een bericht in de wachtrij plaatsen om gegevens op te halen uit blobopslag en de gegevens op te nemen. Er wordt een verbinding tot stand gebracht met het opnamecluster en er wordt een andere client gemaakt om met dat eindpunt te werken.

Tip

Met de volgende codefragmenten wordt voor bijna elke aanroep een exemplaar van een client gemaakt. Dit wordt gedaan om elk fragment afzonderlijk uitvoerbaar te maken. In productie worden de clientexemplaren opnieuw gebruikt en moeten ze zo lang worden bewaard als nodig is. Eén clientexemplaren per URI zijn voldoende, zelfs wanneer u met meerdere databases werkt (database kan worden opgegeven op opdrachtniveau).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Controleren of gegevens zijn opgenomen in de tabel

Wacht vijf tot tien minuten totdat de opname in de wachtrij de opname heeft gepland en de gegevens in uw cluster zijn geladen. Voer vervolgens de volgende code uit om het aantal records in de tabel StormEvents te bepalen.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Query's voor probleemoplossing uitvoeren

Meld u aan bij https://dataexplorer.azure.com en maak verbinding met uw cluster. Voer de volgende opdracht uit in uw database om te zien of er in de afgelopen vier uur fouten zijn opgetreden tijdens het opnemen van gegevens. Vervang de naam van de database voordat u de opdracht uitvoert.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Voer de volgende opdracht uit om de status op te vragen van alle bewerkingen voor het opnemen van gegevens van de afgelopen vier uur. Vervang de naam van de database voordat u de opdracht uitvoert.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Resources opschonen

Als u van plan bent onze andere artikelen te volgen, behoudt u de resources die u hebt gemaakt. Voer anders de volgende opdracht uit in uw database om de tabel StormEvents op te schonen.

.drop table StormEvents