Partager via


Ingérer des données à l’aide du kit de développement logiciel (SDK) Go Azure Data Explorer

L’Explorateur de données Azure est un service d’exploration de données rapide et hautement évolutive pour les données des journaux et les données de télémétrie. Il fournit une bibliothèque de client de kit de développement logiciel (SDK) Go à des fins d’interaction avec le service Azure Data Explorer. Vous pouvez utiliser le kit de développement logiciel (SDK) Go pour ingérer, contrôler et interroger les données des clusters Azure Data Explorer.

Dans cet article, vous allez d’abord créer une table et un mappage de données dans un cluster de test. Vous mettez ensuite l’ingestion en file d’attente l’ingestion sur le cluster à l’aide du kit de développement logiciel (SDK) Go et validez les résultats.

Prérequis

Installer le kit de développement logiciel (SDK) Go

Le kit de développement logiciel (SDK) Go Azure Data Explorer est automatiquement installé lorsque vous exécutez l’exemple d’application qui utilise les modules Go. Si vous avez installé le kit de développement logiciel (SDK) Go pour une autre application, créez un module Go et récupérez le package Azure Data Explorer (en utilisant go get), par exemple :

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

La dépendance du package est ajoutée au fichier go.mod. Utilisez-le dans votre application

Vérifier le code

Cette section Révision du code est facultative. Si vous souhaitez savoir comment le code fonctionne, vous pouvez consulter les extraits de code suivants. Sinon, vous pouvez directement passer à la section Exécution de l’application.

Authenticate

Le programme doit s’authentifier auprès du service Azure Data Explorer avant d’exécuter des opérations.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Une instance d’autorisation Kusto est créée à l’aide des informations d’identification du principal de service. Elle est ensuite utilisée pour créer un client kusto.Client avec la fonction Nouveau qui accepte également le point de terminaison du cluster.

Créer une table

La commande create table est représentée par une instruction Kusto. La fonction Mgmt est utilisée pour exécuter des commandes de gestion. Elle permet d’exécuter la commande afin de créer une table.

func createTable(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
  if err != nil {
    log.Fatal("failed to create table", err)
  }
  log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Conseil

Par défaut, une instruction Kusto est constante à des fins de sécurité renforcée. NewStmt accepte les constantes de chaînes. L’API UnsafeStmt autorise l’utilisation de segments d’instruction non constants, mais ce n’est pas conseillé.

La commande Kusto create table est la suivante :

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Créer un mappage

Les mappages de données sont utilisés lors de l’ingestion pour mapper les données entrantes avec les colonnes des tables Azure Data Explorer. Pour plus d’informations, consultez les Mappage de données. Le mappage est créé de la même façon qu’une table, à l’aide de la fonction Mgmt, avec le nom de la base de données et la commande appropriée. La commande complète est disponible dans le référentiel GitHub correspondant à l’exemple.

func createMapping(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
  if err != nil {
    log.Fatal("failed to create mapping - ", err)
  }
  log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Ingérer des données

Une ingestion est mise en file d’attente à l’aide d’un fichier provenant d’un conteneur Stockage Blob Azure existant.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
  kIngest, err := ingest.New(kc, kustoDB, kustoTable)
  if err != nil {
    log.Fatal("failed to create ingestion client", err)
  }
  blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
  err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

  if err != nil {
    log.Fatal("failed to ingest file", err)
  }
  log.Println("Ingested file from -", blobStorePath)
}

Le client Ingestion est créé à l’aide de ingest.New. La fonction FromFile est utilisée pour faire référence à l’URI du Stockage Blob Azure. Le nom de la référence de mappage et le type de données sont transmis sous la forme de FileOption.

Exécution de l'application

  1. Clonez l’exemple de code depuis GitHub :

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Exécutez l’exemple de code comme indiqué dans cet extrait de code depuis main.go :

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Conseil

    Pour tester différentes combinaisons d’opérations, vous pouvez supprimer les marques de commentaire des fonctions qui conviennent dans main.go.

    Lorsque vous exécutez l’exemple de code, les actions suivantes sont effectuées :

    1. Supprimer une table : la table StormEvents est supprimée (si elle existe).
    2. Création de la table : la table StormEvents est créée.
    3. Création du mappage : le mappage StormEvents_CSV_Mapping est créé.
    4. Ingestion de fichier : un fichier CSV (dans Stockage Blob Azure) est mis en file d’attente pour l’ingestion.
  3. Pour créer un principal de service pour l’authentification, utilisez Azure CLI avec la commande az ad sp create-for-rbac. Définissez les informations du principal de service avec le point de terminaison du cluster et le nom de la base de données sous forme de variables d’environnement qui seront utilisées par le programme :

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Exécutez le programme :

    go run main.go
    

    La sortie se présente comme suit :

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Valider et résoudre les problèmes

Attendez 5 à 10 minutes avant que l’ingestion en file d’attente planifie le processus d’ingestion et charge les données dans Azure Data Explorer.

  1. Connectez-vous à https://dataexplorer.azure.com et à votre cluster. Exécutez ensuite la commande suivante pour obtenir le nombre d’enregistrements de la table StormEvents.

    StormEvents | count
    
  2. Exécutez la commande suivante dans votre base de données pour voir si des échecs d’ingestion se sont produits ces quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Exécutez la commande suivante pour voir l’état de toutes les opérations d’ingestion des quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Nettoyer les ressources

Si vous envisagez de suivre nos autres articles, conservez les ressources que vous avez créées. Sinon, exécutez la commande suivante dans votre base de données pour supprimer la table StormEvents.

.drop table StormEvents

Étape suivante