Ingestování dat pomocí sady Azure Data Explorer Go SDK
Azure Data Explorer je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Poskytuje klientskou knihovnu Go SDK pro interakci se službou Azure Data Explorer. Sadu Go SDK můžete použít k ingestování, řízení a dotazování dat v clusterech Azure Data Exploreru.
V tomto článku nejprve vytvoříte tabulku a mapování dat v testovacím clusteru. Pak do clusteru zařadíte příjem dat do fronty pomocí sady Go SDK a ověříte výsledky.
Požadavky
- Účet Microsoft nebo identita uživatele Microsoft Entra. Předplatné Azure není povinné.
- Cluster a databáze Azure Data Exploreru. Vytvořte cluster a databázi.
- Nainstalujte Git.
- Nainstalujte Go s následujícími minimálními požadavky sady Go SDK.
- Vytvořte registraci aplikace a udělte jí oprávnění k databázi. Uložte ID klienta a tajný klíč klienta pro pozdější použití.
Instalace sady Go SDK
Sada Azure Data Explorer Go SDK se automaticky nainstaluje při spuštění [ukázkové aplikace, která používá moduly Go. Pokud jste nainstalovali sadu Go SDK pro jinou aplikaci, vytvořte modul Go a načtěte balíček Azure Data Exploreru (pomocí go get
), například:
go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto
Závislost balíčku se přidá do go.mod
souboru. Použijte ji v aplikaci Go.
Kontrola kódu
Tato kontrola oddílu kódu je volitelná. Pokud se chcete dozvědět, jak kód funguje, můžete si projít následující fragmenty kódu. V opačném případě můžete přeskočit k spuštění aplikace.
Ověření
Program se musí před provedením jakýchkoli operací ověřit ve službě Azure Data Explorer.
auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)
Instance kusto. Autorizace se vytvoří pomocí přihlašovacích údajů instančního objektu. Pak se používá k vytvoření kusto. Klient s novou funkcí, která přijímá také koncový bod clusteru.
Vytvořit tabulku
Příkaz create table je reprezentován příkazem Kusto. Funkce Mgmt se používá ke spouštění příkazů pro správu. Slouží ke spuštění příkazu k vytvoření tabulky.
func createTable(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
if err != nil {
log.Fatal("failed to create table", err)
}
log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}
Tip
Příkaz Kusto je ve výchozím nastavení konstantní pro lepší zabezpečení. NewStmt
přijímá řetězcové konstanty. Rozhraní UnsafeStmt
API umožňuje používat nekontinutní segmenty příkazů, ale nedoporučuje se.
Příkaz Kusto create table je následující:
.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)
Vytvoření mapování
Mapování dat se používají během příjmu dat k mapování příchozích dat na sloupce v tabulkách Azure Data Exploreru. Další informace najdete v tématu mapování dat. Mapování se vytvoří stejným způsobem jako tabulka pomocí Mgmt
funkce s názvem databáze a příslušným příkazem. Úplný příkaz je k dispozici v úložišti GitHubu pro ukázku.
func createMapping(kc *kusto.Client, kustoDB string) {
_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
if err != nil {
log.Fatal("failed to create mapping - ", err)
}
log.Printf("Mapping %s created\n", kustoMappingRefName)
}
Ingestace dat
Příjem dat se zařadí do fronty pomocí souboru z existujícího kontejneru Azure Blob Storage.
func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
kIngest, err := ingest.New(kc, kustoDB, kustoTable)
if err != nil {
log.Fatal("failed to create ingestion client", err)
}
blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))
if err != nil {
log.Fatal("failed to ingest file", err)
}
log.Println("Ingested file from -", blobStorePath)
}
Klient příjmu dat se vytvoří pomocí ingestování. Nový. Funkce FromFile se používá k odkazu na identifikátor URI služby Azure Blob Storage. Název odkazu na mapování a datový typ se předávají ve formě FileOption.
Spuštění aplikace
Naklonujte ukázkový kód z GitHubu:
git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
Spusťte ukázkový kód, jak je vidět v tomto fragmentu kódu:
main.go
func main { ... dropTable(kc, kustoDB) createTable(kc, kustoDB) createMapping(kc, kustoDB) ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable) ... }
Tip
Pokud chcete vyzkoušet různé kombinace operací, můžete odkomentovat nebo okomentovat příslušné funkce v
main.go
souboru .Při spuštění ukázkového kódu se provádějí následující akce:
- Drop table:
StormEvents
table is dropd (if it exists). - Vytvoření tabulky:
StormEvents
vytvoří se tabulka. - Vytvoření mapování:
StormEvents_CSV_Mapping
vytvoří se mapování. - Příjem souborů: Soubor CSV (ve službě Azure Blob Storage) se zařadí do fronty pro příjem dat.
- Drop table:
K vytvoření instančního objektu pro ověřování použijte Azure CLI s příkazem az ad sp create-for-rbac . Nastavte informace instančního objektu s koncovým bodem clusteru a názvem databáze ve formě proměnných prostředí, které bude program používat:
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export AZURE_SP_TENANT_ID="<replace with tenant>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Spusťte program:
go run main.go
Získáte podobný výstup:
Connected to Azure Data Explorer Using database - testkustodb Failed to drop StormEvents table. Maybe it does not exist? Table StormEvents created in DB testkustodb Mapping StormEvents_CSV_Mapping created Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
Ověření a řešení potíží
Počkejte 5 až 10 minut, než příjem dat ve frontě naplánuje proces příjmu dat a načte data do Azure Data Exploreru.
Přihlaste se k https://dataexplorer.azure.com a připojte se k vašemu clusteru. Potom spuštěním následujícího příkazu získejte počet záznamů v
StormEvents
tabulce.StormEvents | count
Spuštěním následujícího příkazu ve vaší databázi zjistíte, jestli za poslední čtyři hodiny došlo k chybám ingestování. Přes spuštěním nahraďte název databáze.
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Spuštěním následujícího příkazu zobrazíte stav všech operací ingestace za poslední čtyři hodiny. Přes spuštěním nahraďte název databáze.
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Vyčištění prostředků
Pokud máte v úmyslu postupovat podle našich dalších článků, ponechte prostředky, které jste vytvořili. Pokud ne, spuštěním následujícího příkazu v databázi tabulku odstraňte StormEvents
.
.drop table StormEvents