Ingesta de datos mediante el SDK de Java de Kusto
El Explorador de datos de Azure es un servicio de exploración de datos altamente escalable y rápido para datos de telemetría y registro. La biblioteca cliente de Java se puede usar para ingerir datos, comandos de administración de problemas y consultar datos en clústeres de Azure Data Explorer.
En este artículo, aprenderá a ingerir datos mediante la biblioteca de Java de Azure Data Explorer. En primer lugar, creará una tabla y una asignación de datos en un clúster de prueba. A continuación, pondrá en cola una ingesta desde el almacenamiento de blobs al clúster mediante el SDK de Java y validará los resultados.
Requisitos previos
- Una cuenta de Microsoft o una identidad de usuario de Microsoft Entra. No se necesita una suscripción a Azure.
- Un clúster y la base de datos de Azure Data Explorer. Cree un clúster y una base de datos.
- Git.
- JDK versión 1.8 o posterior.
- Maven.
- Cree un registro de aplicación y concédale permisos en la base de datos . Guarde tanto el identificador de cliente como el secreto de cliente para usarlos más adelante.
Revisión del código
Esta sección es opcional. Examine los siguientes fragmentos de código para aprender cómo funciona el código. Para omitir esta sección, vaya a ejecutar la aplicación.
Autenticación
El programa usa las credenciales de autenticación de Microsoft Entra con ConnectionStringBuilder".
Cree un
com.microsoft.azure.kusto.data.Client
para la consulta y administración.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Cree y use
com.microsoft.azure.kusto.ingest.IngestClient
para poner en cola la ingesta de datos en Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Comandos de administración
Los comandos de administración, como .drop
y .create
, se ejecutan llamando a execute
en un com.microsoft.azure.kusto.data.Client
objeto .
Por ejemplo, la tabla StormEvents
se crea de la siguiente manera:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Ingesta de datos
Ponga en cola una ingesta, para lo que debe usar un archivo de un contenedor de Azure Blob Storage existente.
- Use
BlobSourceInfo
para especificar la ruta de acceso de Blob Storage. - Utilice
IngestionProperties
para definir la tabla, la base de datos, el nombre de la asignación y el tipo de datos. En el ejemplo siguiente, el tipo de datos esCSV
.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
El proceso de ingesta se inicia en un subproceso independiente y el subproceso main
espera a que se complete el subproceso de ingesta. Este proceso usa CountdownLatch. La API de ingesta (IngestClient#ingestFromBlob
) no es asincrónica. Se usa un bucle while
para sondear el estado actual cada 5 segundos y espera a que el estado de ingesta cambie de Pending
a otro estado. El estado final puede ser Succeeded
, Failed
o PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Sugerencia
Hay otros métodos para controlar la ingesta de forma asincrónica para las diferentes aplicaciones. Por ejemplo, puede usar CompletableFuture
para crear una canalización que defina la acción posterior a la ingesta, como consultar la tabla, o controlar las excepciones que se han comunicado a IngestionStatus
.
Ejecución de la aplicación
General
Al ejecutar el código de ejemplo, se realizan las siguientes acciones:
- Eliminar tabla: se elimina la tabla
StormEvents
(si existe). - Creación de tablas: se crea la tabla
StormEvents
. - Creación de asignación: se crea la asignación
StormEvents_CSV_Mapping
. - Ingesta de archivos: se pone en cola un archivo CSV (en Azure Blob Storage) para la ingesta.
El siguiente código de ejemplo procede de App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Sugerencia
Para probar diferentes combinaciones de operaciones, puede quitar la marca de comentario o comentar las funciones respectivas en App.java
.
Ejecución de la aplicación
Clone el código de ejemplo de GitHub:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Establezca la información de la entidad de servicio con la siguiente información como las variables del entorno que usa el programa:
- Punto de conexión del clúster
- Nombre de la base de datos
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Compilación y ejecución:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
La salida debe ser similar a:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Espere unos minutos hasta que se complete el proceso de captura. Después de que se complete correctamente, verá el siguiente mensaje de registro: Ingestion completed successfully
. Puede salir del programa en este momento y pasar al siguiente paso sin que ello afecte al proceso de ingesta, que ya se ha puesto en la cola.
Validación
Espere entre 5 y 10 minutos para que la ingesta en cola programe el proceso de ingesta y cargue los datos en Azure Data Explorer.
Inicie sesión en https://dataexplorer.azure.com y conéctese al clúster
Ejecute el siguiente comando para obtener el recuento de registros de la tabla
StormEvents
:StormEvents | count
Solución de problemas
Para ver si se ha producido algún error de ingesta en las últimas cuatro horas, ejecute el siguiente comando en la base de datos:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Para ver el estado de todas las operaciones de ingesta en las últimas cuatro horas, ejecute el siguiente comando:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Limpieza de recursos
Si no tiene previsto usar los recursos que ha creado, ejecute el siguiente comando en la base de datos para anular la tabla StormEvents
.
.drop table StormEvents