Udostępnij za pośrednictwem


Pozyskiwanie danych przy użyciu zestawu Azure Data Explorer Go SDK

Azure Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dziennika i telemetrycznych. Udostępnia bibliotekę klienta zestawu GO SDK do interakcji z usługą Azure Data Explorer. Zestaw SDK języka Go umożliwia pozyskiwanie, kontrolowanie i wykonywanie zapytań dotyczących danych w klastrach usługi Azure Data Explorer.

W tym artykule najpierw utworzysz tabelę i mapowanie danych w klastrze testowym. Następnie należy w kolejce pozyskiwanie do klastra przy użyciu zestawu SDK języka Go i zweryfikować wyniki.

Wymagania wstępne

Instalowanie zestawu SDK języka Go

Zestaw AZURE Data Explorer Go SDK zostanie automatycznie zainstalowany podczas uruchamiania [przykładowej aplikacji korzystającej z modułów Języka Go. Jeśli zestaw SDK języka Go został zainstalowany dla innej aplikacji, utwórz moduł Go i pobierz pakiet usługi Azure Data Explorer (przy użyciu narzędzia go get), na przykład:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

Zależność pakietu zostanie dodana do go.mod pliku. Użyj go w aplikacji Języka Go.

Przeglądanie kodu

Ta sekcja Przejrzyj kod jest opcjonalna. Jeśli chcesz dowiedzieć się, jak działa kod, możesz przejrzeć następujące fragmenty kodu. W przeciwnym razie możesz przejść do sekcji Uruchamianie aplikacji.

Uwierzytelnij

Program musi uwierzytelnić się w usłudze Azure Data Explorer przed wykonaniem jakichkolwiek operacji.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Wystąpienie usługi kusto. Autoryzacja jest tworzona przy użyciu poświadczeń jednostki usługi. Następnie służy do tworzenia kusto. Klient z funkcją New , która akceptuje również punkt końcowy klastra.

Utwórz tabelę

Polecenie create table jest reprezentowane przez instrukcję Kusto. Funkcja Mgmt służy do wykonywania poleceń zarządzania. Służy do wykonywania polecenia w celu utworzenia tabeli.

func createTable(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
  if err != nil {
    log.Fatal("failed to create table", err)
  }
  log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Napiwek

Instrukcja Kusto jest domyślnie stała, aby uzyskać lepsze zabezpieczenia. NewStmt akceptuje stałe ciągów. Interfejs UnsafeStmt API umożliwia używanie segmentów instrukcji niestałych, ale nie jest zalecane.

Polecenie Kusto create table jest następujące:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Tworzenie mapowania

Mapowania danych są używane podczas pozyskiwania do mapowania danych przychodzących na kolumny w tabelach usługi Azure Data Explorer. Aby uzyskać więcej informacji, zobacz mapowanie danych. Mapowanie jest tworzone w taki sam sposób jak tabela przy użyciu Mgmt funkcji z nazwą bazy danych i odpowiednim poleceniem. Pełne polecenie jest dostępne w repozytorium GitHub dla przykładu.

func createMapping(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
  if err != nil {
    log.Fatal("failed to create mapping - ", err)
  }
  log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Pozyskiwanie danych

Pozyskiwanie jest kolejkowane przy użyciu pliku z istniejącego kontenera usługi Azure Blob Storage.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
  kIngest, err := ingest.New(kc, kustoDB, kustoTable)
  if err != nil {
    log.Fatal("failed to create ingestion client", err)
  }
  blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
  err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

  if err != nil {
    log.Fatal("failed to ingest file", err)
  }
  log.Println("Ingested file from -", blobStorePath)
}

Klient pozyskiwania jest tworzony przy użyciu pozyskiwania. Nowy. Funkcja FromFile służy do odwoływania się do identyfikatora URI usługi Azure Blob Storage. Nazwa odwołania do mapowania i typ danych są przekazywane w postaci FileOption.

Uruchamianie aplikacji

  1. Sklonuj przykładowy kod z usługi GitHub:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Uruchom przykładowy kod, jak pokazano w tym fragmencie kodu z pliku main.go:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Napiwek

    Aby wypróbować różne kombinacje operacji, możesz usunąć komentarz/oznaczyć odpowiednie funkcje w pliku main.go.

    Po uruchomieniu przykładowego kodu są wykonywane następujące akcje:

    1. Usuwanie tabeli: StormEvents tabela jest usuwana (jeśli istnieje).
    2. Tworzenie tabeli: StormEvents tworzona jest tabela.
    3. Tworzenie mapowania: StormEvents_CSV_Mapping mapowanie jest tworzone.
    4. Pozyskiwanie plików: plik CSV (w usłudze Azure Blob Storage) jest kolejkowany do pozyskiwania.
  3. Aby utworzyć jednostkę usługi na potrzeby uwierzytelniania, użyj interfejsu wiersza polecenia platformy Azure za pomocą polecenia az ad sp create-for-rbac . Ustaw informacje o jednostce usługi z punktem końcowym klastra i nazwą bazy danych w postaci zmiennych środowiskowych, które będą używane przez program:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Uruchom program:

    go run main.go
    

    Uzyskasz podobne dane wyjściowe:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Weryfikowanie i rozwiązywanie problemów

Poczekaj od 5 do 10 minut na zaplanowanie procesu pozyskiwania w kolejce i załadowanie danych do usługi Azure Data Explorer.

  1. Zaloguj się do portalu https://dataexplorer.azure.com i nawiąż połączenie z klastrem. Następnie uruchom następujące polecenie, aby uzyskać liczbę rekordów w StormEvents tabeli.

    StormEvents | count
    
  2. Uruchom następujące polecenie w bazie danych, aby sprawdzić, czy wystąpiły jakieś niepowodzenia pozyskiwania w ciągu ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Uruchom następujące polecenie, aby wyświetlić stan wszystkich operacji pozyskiwania z ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Czyszczenie zasobów

Jeśli planujesz postępować zgodnie z innymi artykułami, zachowaj utworzone zasoby. Jeśli nie, uruchom następujące polecenie w bazie danych, aby usunąć tabelę StormEvents .

.drop table StormEvents

Następny krok