Sdílet prostřednictvím


Ingestování dat pomocí sady Azure Data Explorer Go SDK

Azure Data Explorer je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Poskytuje klientskou knihovnu Go SDK pro interakci se službou Azure Data Explorer. Sadu Go SDK můžete použít k ingestování, řízení a dotazování dat v clusterech Azure Data Exploreru.

V tomto článku nejprve vytvoříte tabulku a mapování dat v testovacím clusteru. Pak do clusteru zařadíte příjem dat do fronty pomocí sady Go SDK a ověříte výsledky.

Požadavky

Instalace sady Go SDK

Sada Azure Data Explorer Go SDK se automaticky nainstaluje při spuštění [ukázkové aplikace, která používá moduly Go. Pokud jste nainstalovali sadu Go SDK pro jinou aplikaci, vytvořte modul Go a načtěte balíček Azure Data Exploreru (pomocí go get), například:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

Závislost balíčku se přidá do go.mod souboru. Použijte ji v aplikaci Go.

Kontrola kódu

Tato kontrola oddílu kódu je volitelná. Pokud se chcete dozvědět, jak kód funguje, můžete si projít následující fragmenty kódu. V opačném případě můžete přeskočit k spuštění aplikace.

Ověření

Program se musí před provedením jakýchkoli operací ověřit ve službě Azure Data Explorer.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Instance kusto. Autorizace se vytvoří pomocí přihlašovacích údajů instančního objektu. Pak se používá k vytvoření kusto. Klient s novou funkcí, která přijímá také koncový bod clusteru.

Vytvořit tabulku

Příkaz create table je reprezentován příkazem Kusto. Funkce Mgmt se používá ke spouštění příkazů pro správu. Slouží ke spuštění příkazu k vytvoření tabulky.

func createTable(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
  if err != nil {
    log.Fatal("failed to create table", err)
  }
  log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Tip

Příkaz Kusto je ve výchozím nastavení konstantní pro lepší zabezpečení. NewStmt přijímá řetězcové konstanty. Rozhraní UnsafeStmt API umožňuje používat nekontinutní segmenty příkazů, ale nedoporučuje se.

Příkaz Kusto create table je následující:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Vytvoření mapování

Mapování dat se používají během příjmu dat k mapování příchozích dat na sloupce v tabulkách Azure Data Exploreru. Další informace najdete v tématu mapování dat. Mapování se vytvoří stejným způsobem jako tabulka pomocí Mgmt funkce s názvem databáze a příslušným příkazem. Úplný příkaz je k dispozici v úložišti GitHubu pro ukázku.

func createMapping(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
  if err != nil {
    log.Fatal("failed to create mapping - ", err)
  }
  log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Ingestace dat

Příjem dat se zařadí do fronty pomocí souboru z existujícího kontejneru Azure Blob Storage.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
  kIngest, err := ingest.New(kc, kustoDB, kustoTable)
  if err != nil {
    log.Fatal("failed to create ingestion client", err)
  }
  blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
  err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

  if err != nil {
    log.Fatal("failed to ingest file", err)
  }
  log.Println("Ingested file from -", blobStorePath)
}

Klient příjmu dat se vytvoří pomocí ingestování. Nový. Funkce FromFile se používá k odkazu na identifikátor URI služby Azure Blob Storage. Název odkazu na mapování a datový typ se předávají ve formě FileOption.

Spuštění aplikace

  1. Naklonujte ukázkový kód z GitHubu:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Spusťte ukázkový kód, jak je vidět v tomto fragmentu kódu:main.go

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Tip

    Pokud chcete vyzkoušet různé kombinace operací, můžete odkomentovat nebo okomentovat příslušné funkce v main.gosouboru .

    Při spuštění ukázkového kódu se provádějí následující akce:

    1. Drop table: StormEvents table is dropd (if it exists).
    2. Vytvoření tabulky: StormEvents vytvoří se tabulka.
    3. Vytvoření mapování: StormEvents_CSV_Mapping vytvoří se mapování.
    4. Příjem souborů: Soubor CSV (ve službě Azure Blob Storage) se zařadí do fronty pro příjem dat.
  3. K vytvoření instančního objektu pro ověřování použijte Azure CLI s příkazem az ad sp create-for-rbac . Nastavte informace instančního objektu s koncovým bodem clusteru a názvem databáze ve formě proměnných prostředí, které bude program používat:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Spusťte program:

    go run main.go
    

    Získáte podobný výstup:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Ověření a řešení potíží

Počkejte 5 až 10 minut, než příjem dat ve frontě naplánuje proces příjmu dat a načte data do Azure Data Exploreru.

  1. Přihlaste se k https://dataexplorer.azure.com a připojte se k vašemu clusteru. Potom spuštěním následujícího příkazu získejte počet záznamů v StormEvents tabulce.

    StormEvents | count
    
  2. Spuštěním následujícího příkazu ve vaší databázi zjistíte, jestli za poslední čtyři hodiny došlo k chybám ingestování. Přes spuštěním nahraďte název databáze.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Spuštěním následujícího příkazu zobrazíte stav všech operací ingestace za poslední čtyři hodiny. Přes spuštěním nahraďte název databáze.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Vyčištění prostředků

Pokud máte v úmyslu postupovat podle našich dalších článků, ponechte prostředky, které jste vytvořili. Pokud ne, spuštěním následujícího příkazu v databázi tabulku odstraňte StormEvents .

.drop table StormEvents

Další krok