Compartir a través de


Ingesta de datos mediante el SDK de Java de Kusto

El Explorador de datos de Azure es un servicio de exploración de datos altamente escalable y rápido para datos de telemetría y registro. La biblioteca cliente de Java se puede usar para ingerir datos, comandos de administración de problemas y consultar datos en clústeres de Azure Data Explorer.

En este artículo, aprenderá a ingerir datos mediante la biblioteca de Java de Azure Data Explorer. En primer lugar, creará una tabla y una asignación de datos en un clúster de prueba. A continuación, pondrá en cola una ingesta desde el almacenamiento de blobs al clúster mediante el SDK de Java y validará los resultados.

Requisitos previos

Revisión del código

Esta sección es opcional. Examine los siguientes fragmentos de código para aprender cómo funciona el código. Para omitir esta sección, vaya a ejecutar la aplicación.

Autenticación

El programa usa las credenciales de autenticación de Microsoft Entra con ConnectionStringBuilder".

  1. Cree un com.microsoft.azure.kusto.data.Client para la consulta y administración.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Cree y use com.microsoft.azure.kusto.ingest.IngestClient para poner en cola la ingesta de datos en Azure Data Explorer:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Comandos de administración

Los comandos de administración, como .drop y .create, se ejecutan llamando a execute en un com.microsoft.azure.kusto.data.Client objeto .

Por ejemplo, la tabla StormEvents se crea de la siguiente manera:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Ingesta de datos

Ponga en cola una ingesta, para lo que debe usar un archivo de un contenedor de Azure Blob Storage existente.

  • Use BlobSourceInfo para especificar la ruta de acceso de Blob Storage.
  • Utilice IngestionProperties para definir la tabla, la base de datos, el nombre de la asignación y el tipo de datos. En el ejemplo siguiente, el tipo de datos es CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

El proceso de ingesta se inicia en un subproceso independiente y el subproceso main espera a que se complete el subproceso de ingesta. Este proceso usa CountdownLatch. La API de ingesta (IngestClient#ingestFromBlob) no es asincrónica. Se usa un bucle while para sondear el estado actual cada 5 segundos y espera a que el estado de ingesta cambie de Pending a otro estado. El estado final puede ser Succeeded, Failed o PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Sugerencia

Hay otros métodos para controlar la ingesta de forma asincrónica para las diferentes aplicaciones. Por ejemplo, puede usar CompletableFuture para crear una canalización que defina la acción posterior a la ingesta, como consultar la tabla, o controlar las excepciones que se han comunicado a IngestionStatus.

Ejecución de la aplicación

General

Al ejecutar el código de ejemplo, se realizan las siguientes acciones:

  1. Eliminar tabla: se elimina la tabla StormEvents (si existe).
  2. Creación de tablas: se crea la tabla StormEvents.
  3. Creación de asignación: se crea la asignación StormEvents_CSV_Mapping.
  4. Ingesta de archivos: se pone en cola un archivo CSV (en Azure Blob Storage) para la ingesta.

El siguiente código de ejemplo procede de App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Sugerencia

Para probar diferentes combinaciones de operaciones, puede quitar la marca de comentario o comentar las funciones respectivas en App.java.

Ejecución de la aplicación

  1. Clone el código de ejemplo de GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Establezca la información de la entidad de servicio con la siguiente información como las variables del entorno que usa el programa:

    • Punto de conexión del clúster
    • Nombre de la base de datos
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Compilación y ejecución:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    La salida debe ser similar a:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Espere unos minutos hasta que se complete el proceso de captura. Después de que se complete correctamente, verá el siguiente mensaje de registro: Ingestion completed successfully. Puede salir del programa en este momento y pasar al siguiente paso sin que ello afecte al proceso de ingesta, que ya se ha puesto en la cola.

Validación

Espere entre 5 y 10 minutos para que la ingesta en cola programe el proceso de ingesta y cargue los datos en Azure Data Explorer.

  1. Inicie sesión en https://dataexplorer.azure.com y conéctese al clúster

  2. Ejecute el siguiente comando para obtener el recuento de registros de la tabla StormEvents:

    StormEvents | count
    

Solución de problemas

  1. Para ver si se ha producido algún error de ingesta en las últimas cuatro horas, ejecute el siguiente comando en la base de datos:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Para ver el estado de todas las operaciones de ingesta en las últimas cuatro horas, ejecute el siguiente comando:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Limpieza de recursos

Si no tiene previsto usar los recursos que ha creado, ejecute el siguiente comando en la base de datos para anular la tabla StormEvents.

.drop table StormEvents