Compartir a través de


Ingesta de datos mediante el SDK de Azure Data Explorer para Go

El Explorador de datos de Azure es un servicio de exploración de datos altamente escalable y rápido para datos de telemetría y registro. Proporciona una biblioteca cliente del SDK para Go para interactuar con el servicio Azure Data Explorer. Puede usar el SDK para Go para ingerir, controlar y consultar los datos de los clústeres de Azure Data Explorer.

En este artículo, primero creará una tabla y la asignación de datos en un clúster de prueba. A continuación, pondrá en cola la ingesta en el clúster con el SDK para Go y validará los resultados.

Requisitos previos

Instalación del SDK para Go

El SDK de Azure Data Explorer para Go se instalará automáticamente al ejecutar la aplicación de ejemplo, que usa módulos de Go. Si ha instalado el SDK para Go para otra aplicación, cree un módulo de Go y recupere el paquete de Azure Data Explorer (con go get); por ejemplo:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

Las dependencias del paquete se agregarán al archivo go.mod. Úselo en la aplicación de Go.

Revisión del código

La sección Revisión del código es opcional. Si le interesa aprender cómo funciona el código, puede revisar los siguientes fragmentos de código. En caso contrario, puede ir directamente a Ejecutar la aplicación.

Authenticate

El programa debe autenticarse en el servicio Azure Data Explorer antes de ejecutar cualquier operación.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Se crea una instancia de kusto.Authorization con las credenciales de la entidad de servicio. A continuación, se usa para crear una instancia de kusto.Client con la función New, que también acepta el punto de conexión del clúster.

Crear tabla

El comando CREATE TABLE se representa mediante una instrucción de Kusto y se usa la función Mgmt para ejecutar los comandos de administración. Se usa para ejecutar el comando para crear una tabla.

func createTable(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
  if err != nil {
    log.Fatal("failed to create table", err)
  }
  log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Sugerencia

De forma predeterminada, una instrucción de Kusto es constante para mejorar la seguridad. NewStmt acepta constantes de cadena. La API UnsafeStmt permite el uso de segmentos de instrucciones no constantes, pero no se recomienda.

El comando de Kusto para CREATE TABLE es el siguiente:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Creación de la asignación

Las asignaciones de datos se usan durante la ingesta para asignar los datos entrantes a las columnas dentro de las tablas de Azure Data Explorer. Para más información, consulte Asignación de datos. La asignación se crea de la misma forma que una tabla, mediante el uso de la función Mgmt con el nombre de la base de datos y el comando adecuado. El comando completo está disponible en el repositorio de GitHub del ejemplo.

func createMapping(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
  if err != nil {
    log.Fatal("failed to create mapping - ", err)
  }
  log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Ingerir datos

Una ingesta se pone en cola mediante un archivo de un contenedor de Azure Blob Storage existente.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
  kIngest, err := ingest.New(kc, kustoDB, kustoTable)
  if err != nil {
    log.Fatal("failed to create ingestion client", err)
  }
  blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
  err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

  if err != nil {
    log.Fatal("failed to ingest file", err)
  }
  log.Println("Ingested file from -", blobStorePath)
}

El cliente de ingesta se crea con ingest.New. La función FromFile se utiliza para hacer referencia al identificador URI de Azure Blob Storage. El nombre de la referencia de asignación y el tipo de datos se pasan en forma de FileOption.

Ejecución de la aplicación

  1. Clone el código de ejemplo de GitHub:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Ejecute el código de ejemplo tal y como se muestra en este fragmento de código de main.go:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Sugerencia

    Para probar diferentes combinaciones de operaciones, puede quitar la marca de comentario o comentar las funciones respectivas en main.go.

    Al ejecutar el código de ejemplo, se realizan las siguientes acciones:

    1. Eliminar tabla: se elimina la tabla StormEvents (si existe).
    2. Creación de tablas: se crea la tabla StormEvents.
    3. Creación de asignación: se crea la asignación StormEvents_CSV_Mapping.
    4. Ingesta de archivos: se pone en cola un archivo CSV (en Azure Blob Storage) para la ingesta.
  3. Para crear una entidad de servicio para la autenticación, use la CLI de Azure con el comando az ad sp create-for-rbac. Establezca la información de la entidad de servicio con el punto de conexión del clúster y el nombre de la base de datos en forma de las variables de entorno que usará el programa:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Ejecute el programa:

    go run main.go
    

    Obtendrá una salida similar a:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Validación y solución de problemas

Espere entre 5 y 10 minutos para que la ingesta en cola programe el proceso de ingesta y cargue los datos en Azure Data Explorer.

  1. Inicie sesión en https://dataexplorer.azure.com y conéctese al clúster A continuación, ejecute el siguiente comando para obtener el recuento de registros de la tabla StormEvents.

    StormEvents | count
    
  2. Ejecute el siguiente comando en la base de datos para ver si se ha producido algún error de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarlo.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Ejecute el siguiente comando para ver el estado de todas las operaciones de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarlo.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Limpieza de recursos

Si tiene previsto seguir nuestros otros artículos, conserve los recursos que creó. De lo contrario, ejecute el siguiente comando en la base de datos para eliminar la tabla StormEvents.

.drop table StormEvents

Paso siguiente