Pozyskiwanie danych przy użyciu zestawu Azure Data Explorer Go SDK
Azure Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dziennika i telemetrycznych. Udostępnia bibliotekę klienta zestawu GO SDK do interakcji z usługą Azure Data Explorer. Zestaw SDK języka Go umożliwia pozyskiwanie, kontrolowanie i wykonywanie zapytań dotyczących danych w klastrach usługi Azure Data Explorer.
W tym artykule najpierw utworzysz tabelę i mapowanie danych w klastrze testowym. Następnie należy w kolejce pozyskiwanie do klastra przy użyciu zestawu SDK języka Go i zweryfikować wyniki.
Wymagania wstępne
- Konto Microsoft lub tożsamość użytkownika Microsoft Entra. Subskrypcja platformy Azure nie jest wymagana.
- Baza danych i klaster usługi Azure Data Explorer. Utwórz klaster i bazę danych.
- Zainstalowanie oprogramowania Git.
- Zainstaluj język Go z następującymi minimalnymi wymaganiami dotyczącymi zestawu SDK języka Go.
- Utwórz rejestrację aplikacji i przyznaj jej uprawnienia do bazy danych. Zapisz identyfikator klienta i klucz tajny klienta do późniejszego użycia.
Instalowanie zestawu SDK języka Go
Zestaw AZURE Data Explorer Go SDK zostanie automatycznie zainstalowany podczas uruchamiania [przykładowej aplikacji korzystającej z modułów Języka Go. Jeśli zestaw SDK języka Go został zainstalowany dla innej aplikacji, utwórz moduł Go i pobierz pakiet usługi Azure Data Explorer (przy użyciu narzędzia go get
), na przykład:
go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto
Zależność pakietu zostanie dodana do go.mod
pliku. Użyj go w aplikacji Języka Go.
Przeglądanie kodu
Ta sekcja Przejrzyj kod jest opcjonalna. Jeśli chcesz dowiedzieć się, jak działa kod, możesz przejrzeć następujące fragmenty kodu. W przeciwnym razie możesz przejść do sekcji Uruchamianie aplikacji.
Uwierzytelnij
Program musi uwierzytelnić się w usłudze Azure Data Explorer przed wykonaniem jakichkolwiek operacji.
auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)
Wystąpienie usługi kusto. Autoryzacja jest tworzona przy użyciu poświadczeń jednostki usługi. Następnie służy do tworzenia kusto. Klient z funkcją New , która akceptuje również punkt końcowy klastra.
Utwórz tabelę
Polecenie create table jest reprezentowane przez instrukcję Kusto. Funkcja Mgmt służy do wykonywania poleceń zarządzania. Służy do wykonywania polecenia w celu utworzenia tabeli.
func createTable(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
if err != nil {
log.Fatal("failed to create table", err)
}
log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}
Napiwek
Instrukcja Kusto jest domyślnie stała, aby uzyskać lepsze zabezpieczenia. NewStmt
akceptuje stałe ciągów. Interfejs UnsafeStmt
API umożliwia używanie segmentów instrukcji niestałych, ale nie jest zalecane.
Polecenie Kusto create table jest następujące:
.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)
Tworzenie mapowania
Mapowania danych są używane podczas pozyskiwania do mapowania danych przychodzących na kolumny w tabelach usługi Azure Data Explorer. Aby uzyskać więcej informacji, zobacz mapowanie danych. Mapowanie jest tworzone w taki sam sposób jak tabela przy użyciu Mgmt
funkcji z nazwą bazy danych i odpowiednim poleceniem. Pełne polecenie jest dostępne w repozytorium GitHub dla przykładu.
func createMapping(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
if err != nil {
log.Fatal("failed to create mapping - ", err)
}
log.Printf("Mapping %s created\n", kustoMappingRefName)
}
Pozyskiwanie danych
Pozyskiwanie jest kolejkowane przy użyciu pliku z istniejącego kontenera usługi Azure Blob Storage.
func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
kIngest, err := ingest.New(kc, kustoDB, kustoTable)
if err != nil {
log.Fatal("failed to create ingestion client", err)
}
blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))
if err != nil {
log.Fatal("failed to ingest file", err)
}
log.Println("Ingested file from -", blobStorePath)
}
Klient pozyskiwania jest tworzony przy użyciu pozyskiwania. Nowy. Funkcja FromFile służy do odwoływania się do identyfikatora URI usługi Azure Blob Storage. Nazwa odwołania do mapowania i typ danych są przekazywane w postaci FileOption.
Uruchamianie aplikacji
Sklonuj przykładowy kod z usługi GitHub:
git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
Uruchom przykładowy kod, jak pokazano w tym fragmencie kodu z pliku
main.go
:func main { ... dropTable(kc, kustoDB) createTable(kc, kustoDB) createMapping(kc, kustoDB) ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable) ... }
Napiwek
Aby wypróbować różne kombinacje operacji, możesz usunąć komentarz/oznaczyć odpowiednie funkcje w pliku
main.go
.Po uruchomieniu przykładowego kodu są wykonywane następujące akcje:
- Usuwanie tabeli:
StormEvents
tabela jest usuwana (jeśli istnieje). - Tworzenie tabeli:
StormEvents
tworzona jest tabela. - Tworzenie mapowania:
StormEvents_CSV_Mapping
mapowanie jest tworzone. - Pozyskiwanie plików: plik CSV (w usłudze Azure Blob Storage) jest kolejkowany do pozyskiwania.
- Usuwanie tabeli:
Aby utworzyć jednostkę usługi na potrzeby uwierzytelniania, użyj interfejsu wiersza polecenia platformy Azure za pomocą polecenia az ad sp create-for-rbac . Ustaw informacje o jednostce usługi z punktem końcowym klastra i nazwą bazy danych w postaci zmiennych środowiskowych, które będą używane przez program:
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export AZURE_SP_TENANT_ID="<replace with tenant>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Uruchom program:
go run main.go
Uzyskasz podobne dane wyjściowe:
Connected to Azure Data Explorer Using database - testkustodb Failed to drop StormEvents table. Maybe it does not exist? Table StormEvents created in DB testkustodb Mapping StormEvents_CSV_Mapping created Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
Weryfikowanie i rozwiązywanie problemów
Poczekaj od 5 do 10 minut na zaplanowanie procesu pozyskiwania w kolejce i załadowanie danych do usługi Azure Data Explorer.
Zaloguj się do portalu https://dataexplorer.azure.com i nawiąż połączenie z klastrem. Następnie uruchom następujące polecenie, aby uzyskać liczbę rekordów w
StormEvents
tabeli.StormEvents | count
Uruchom następujące polecenie w bazie danych, aby sprawdzić, czy wystąpiły jakieś niepowodzenia pozyskiwania w ciągu ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Uruchom następujące polecenie, aby wyświetlić stan wszystkich operacji pozyskiwania z ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Czyszczenie zasobów
Jeśli planujesz postępować zgodnie z innymi artykułami, zachowaj utworzone zasoby. Jeśli nie, uruchom następujące polecenie w bazie danych, aby usunąć tabelę StormEvents
.
.drop table StormEvents