Udostępnij za pośrednictwem


Pozyskiwanie danych przy użyciu zestawu SDK platformy .NET Kusto

Istnieją dwie biblioteki klienta dla platformy .NET: biblioteka pozyskiwania i biblioteka danych. Aby uzyskać więcej informacji na temat zestawu .NET SDK, zobacz about .NET SDK (Zestaw SDK platformy .NET). Te biblioteki umożliwiają pozyskiwanie (ładowanie) danych do klastra i wykonywanie zapytań o dane z kodu. W tym artykule najpierw utworzysz tabelę i mapowanie danych w klastrze testowym. Następnie umieścisz pozyskiwanie w kolejce do klastra i sprawdzisz poprawność wyników.

Wymagania wstępne

  • Konto Microsoft lub tożsamość użytkownika Microsoft Entra. Subskrypcja platformy Azure nie jest wymagana.
  • Klaster i baza danych. Utwórz klaster i bazę danych.

Instalowanie biblioteki pozyskiwania

Install-Package Microsoft.Azure.Kusto.Ingest

Dodawanie uwierzytelniania i konstruowanie parametry połączenia

Uwierzytelnianie

Aby uwierzytelnić aplikację, zestaw SDK używa identyfikatora dzierżawy firmy Microsoft Entra. Aby odnaleźć identyfikator dzierżawy, użyj następującego adresu URL, zastępując ciąg YourDomain nazwą domeny.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Jeśli na przykład Twoja domena to contoso.com, adres URL to https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Kliknij ten adres URL, aby wyświetlić wyniki. Pierwszy wiersz wygląda w następujący sposób.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Identyfikator dzierżawy w tym przypadku to 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

W tym przykładzie użyto interakcyjnego uwierzytelniania użytkownika microsoft Entra w celu uzyskania dostępu do klastra. Możesz również użyć uwierzytelniania aplikacji Entra firmy Microsoft z certyfikatem lub wpisem tajnym aplikacji. Przed uruchomieniem tego kodu upewnij się, że ustawiono poprawne wartości dla tenantId i clusterUri .

Zestaw SDK zapewnia wygodny sposób konfigurowania metody uwierzytelniania w ramach parametry połączenia. Aby uzyskać pełną dokumentację dotyczącą parametry połączenia, zobacz parametry połączenia s.

Uwaga

Bieżąca wersja zestawu SDK nie obsługuje uwierzytelniania interakcyjnego użytkownika na platformie .NET Core. Jeśli jest to wymagane, zamiast tego użyj nazwy użytkownika/hasła firmy Microsoft lub uwierzytelniania aplikacji.

Tworzenie parametrów połączenia

Teraz możesz skonstruować parametry połączenia. Utworzysz tabelę docelową i mapujesz w późniejszym kroku.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Ustawianie informacji o pliku źródłowym

Ustaw ścieżkę dla pliku źródłowego. W tym przykładzie używany jest przykładowy plik hostowany w usłudze Azure Blob Storage. Przykładowy zestaw danych StormEvents zawiera dane związane z pogodą z National Centers for Environmental Information.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Tworzenie tabeli w klastrze testowym

Utwórz tabelę o nazwie StormEvents, która będzie zgodna ze schematem danych w pliku StormEvents.csv.

Napiwek

Poniższe fragmenty kodu tworzą wystąpienie klienta dla niemal każdego wywołania. Należy to zrobić, aby każdy fragment kodu był uruchamiany indywidualnie. W środowisku produkcyjnym wystąpienia klienta są ponowne i powinny być przechowywane tak długo, jak to konieczne. Pojedyncze wystąpienie klienta na identyfikator URI jest wystarczające, nawet podczas pracy z wieloma bazami danych (bazę danych można określić na poziomie polecenia).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definiowanie mapowania pozyskiwania

Zamapuj przychodzące dane CSV na nazwy kolumn używane podczas tworzenia tabeli. Aprowizuj obiekt mapowania kolumn CSV w tej tabeli.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definiowanie zasad dzielenia na partie dla tabeli

Przetwarzanie wsadowe danych przychodzących optymalizuje rozmiar fragmentów danych, który jest kontrolowany przez zasady dzielenia na partie pozyskiwania. Zmodyfikuj zasady za pomocą polecenia zarządzania zasadami dzielenia na partie pozyskiwania. Użyj tych zasad, aby zmniejszyć opóźnienie powoli przychodzących danych.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Zalecamy zdefiniowanie Raw Data Size wartości pozyskanych danych i przyrostowe zmniejszenie rozmiaru w kierunku 250 MB przy jednoczesnym sprawdzeniu, czy wydajność się poprawia.

Za pomocą Flush Immediately właściwości można pominąć przetwarzanie wsadowe, chociaż nie jest to zalecane w przypadku pozyskiwania na dużą skalę, ponieważ może to spowodować niską wydajność.

Wysyłanie komunikatu do kolejki w celu pozyskiwania

Kolejkowanie komunikatu w celu ściągnięcia danych z magazynu obiektów blob i pozyskiwania danych. Nawiązane jest połączenie z klastrem pozyskiwania, a inny klient jest tworzony w celu pracy z tym punktem końcowym.

Napiwek

Poniższe fragmenty kodu tworzą wystąpienie klienta dla niemal każdego wywołania. Należy to zrobić, aby każdy fragment kodu był uruchamiany indywidualnie. W środowisku produkcyjnym wystąpienia klienta są ponowne i powinny być przechowywane tak długo, jak to konieczne. Pojedyncze wystąpienie klienta na identyfikator URI jest wystarczające, nawet podczas pracy z wieloma bazami danych (bazę danych można określić na poziomie polecenia).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Weryfikowanie, czy dane zostały pozyskane do tabeli

Poczekaj od pięciu do dziesięciu minut na zaplanowanie pozyskiwania w kolejce i załadowanie danych do klastra. Następnie uruchom następujący kod, aby uzyskać liczbę rekordów w tabeli StormEvents.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Uruchamianie zapytań dotyczących rozwiązywania problemów

Zaloguj się do portalu https://dataexplorer.azure.com i nawiąż połączenie z klastrem. Uruchom następujące polecenie w bazie danych, aby sprawdzić, czy wystąpiły jakieś niepowodzenia pozyskiwania w ciągu ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Uruchom następujące polecenie, aby wyświetlić stan wszystkich operacji pozyskiwania z ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Czyszczenie zasobów

Jeśli planujesz postępować zgodnie z innymi artykułami, zachowaj utworzone zasoby. W przeciwnym razie uruchom następujące polecenie w bazie danych, aby wyczyścić tabelę StormEvents.

.drop table StormEvents