Ingesta de datos de ejemplo con formato JSON en Azure Data Explorer
Artículo
En este artículo, se muestra cómo ingerir datos con formato JSON en una base de datos de Azure Data Explorer. Comenzará con ejemplos sencillos de JSON asignado y sin formato, continuará con JSON multilínea y, a continuación, abordará esquemas JSON más complejos que contengan matrices y diccionarios. En los ejemplos se detalla el proceso de ingesta de datos con formato JSON mediante Lenguaje de consulta Kusto (KQL), C# o Python.
Nota
No se recomienda usar .ingest comandos de administración en escenarios de producción. En su lugar, use un conector de datos o ingiera datos mediante programación mediante una de las bibliotecas cliente de Kusto.
Requisitos previos
Una cuenta microsoft o una identidad de usuario Microsoft Entra. No se necesita una suscripción a Azure.
Azure Data Explorer admite dos formatos de archivo JSON:
json: JSON separado por líneas. Cada línea de los datos de entrada tiene exactamente un registro JSON. Este formato admite el análisis de comentarios y propiedades entre comillas únicas. Para más información, consulte JSON Lines.
multijson: JSON multilínea. El analizador omite los separadores de línea y lee un registro desde la posición anterior hasta el final de un JSON válido.
Nota
Al ingerir mediante el Asistente para ingesta, el formato predeterminado es multijson. El formato puede controlar registros JSON de varias líneas y matrices de registros JSON. Cuando se encuentra un error de análisis, se descarta todo el archivo. Para omitir los registros JSON no válidos, seleccione la opción "Omitir errores de formato de datos". Esto cambiará el formato a json (líneas JSON).
Si usa el formato de línea JSON (json), las líneas que no representan registros JSON válidos se omiten durante el análisis.
Ingesta y asignación de datos con formato JSON
La ingesta de datos con formato JSON requiere que especifique el formato mediante la propiedad de ingesta. La ingesta de datos JSON requiere la asignación, que asigna una entrada de origen JSON a su columna de destino. Al ingerir de datos, use la propiedad IngestionMapping con su propiedad de ingesta ingestionMappingReference (para una asignación predefinida) o su propiedad IngestionMappings. En este artículo se usará la propiedad de ingesta ingestionMappingReference, que está predefinida en la tabla que se utiliza para la ingesta. En los ejemplos siguientes, comenzaremos mediante la ingesta de registros JSON como datos sin procesar en una tabla de una sola columna. A continuación, usaremos la asignación para ingerir cada propiedad en su columna asignada.
Ejemplo de JSON simple
El siguiente ejemplo es un JSON simple, con una estructura plana. Los datos tienen información sobre la temperatura y la humedad, recopilados por varios dispositivos. Cada registro se marca con un identificador y una marca de tiempo.
En este ejemplo, se ingieren registros JSON como datos sin procesar en una tabla de una sola columna. La manipulación de datos, mediante consultas, y la directiva de actualización se realizan una vez que se ingieren los datos.
En el cuadro de diálogo Agregar clúster, escriba la URL del clúster con el formato https://<ClusterName>.<Region>.kusto.windows.net/ y después haga clic en Agregar.
Pegue el siguiente comando y seleccione Ejecutar para crear la tabla.
.create table RawEvents (Event: dynamic)
Esta consulta crea una tabla con una sola columna Event de un tipo de datos dinámico.
Este comando crea una asignación y asigna la ruta de acceso raíz de JSON $ a la columna Event.
Ingiera los datos en la tabla RawEvents.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
Use C# para ingerir datos en formato JSON sin procesar.
Cree la tabla RawEvents.
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
En esta asignación, tal y como se define en el esquema de tabla, las entradas timestamp se ingerirán en la columna Time como tipos de datos datetime.
Ingiera los datos en la tabla Events.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
El archivo "simple.JSON" tiene algunos registros JSON separados por líneas. El formato es json, y la asignación utilizada en el comando de ingesta es el elemento FlatEventMapping que creó.
Cree una tabla con un esquema similar a los datos de entrada JSON. Usaremos esta tabla en todos los ejemplos y comandos de ingesta siguientes.
En esta asignación, tal y como se define en el esquema de tabla, las entradas timestamp se ingerirán en la columna Time como tipos de datos datetime.
Ingiera los datos en la tabla Events.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
El archivo "simple.JSON" tiene algunos registros JSON separados por líneas. El formato es json, y la asignación utilizada en el comando de ingesta es el elemento FlatEventMapping que creó.
Cree una tabla con un esquema similar a los datos de entrada JSON. Usaremos esta tabla en todos los ejemplos y comandos de ingesta siguientes.
El archivo "simple.JSON" tiene algunos registros JSON separados por líneas. El formato es json, y la asignación utilizada en el comando de ingesta es el elemento FlatEventMapping que creó.
Ingesta de registros JSON multilínea
En este ejemplo, se ingieren registros JSON multilínea. Cada propiedad JSON se asigna a una sola columna de la tabla. El archivo "multilined.json" tiene algunos registros JSON con sangría. El formato multijson indica que la estructura JSON lee los registros.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Ingiera los datos en la tabla Events.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Los tipos de datos de matriz son una colección ordenada de valores. La ingesta de una matriz JSON se realiza mediante una directiva de actualización. El JSON se ingiere tal cual en una tabla intermedia. Una directiva de actualización ejecuta una función predefinida en la tabla RawEvents, de forma que se vuelven a ingerir los resultados en la tabla de destino. Los datos se ingerirán con la siguiente estructura:
Cree una función update policy que expanda la recopilación de datos de records para que cada valor de la recopilación reciba una fila independiente, mediante el operador mv-expand. Se usará la tabla RawEvents como tabla de origen y Events como tabla de destino.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
El esquema que recibe la función debe coincidir con el esquema de la tabla de destino. Utilice el operador getschema para revisar el esquema.
EventRecordsExpand() | getschema
Agregue la directiva de actualización a la tabla de destino. Esta directiva ejecuta automáticamente la consulta en todos los datos recién ingeridos en la tabla intermedia RawEvents e ingiere los resultados en la tabla Events. Defina una directiva de retención cero para evitar que se conserve la tabla intermedia.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Revise los datos de la tabla Events.
Events
Cree una función de actualización que expanda la recopilación de records para que cada valor de la recopilación reciba una fila independiente, mediante el operador mv-expand. Se usará la tabla RawEvents como tabla de origen y Events como tabla de destino.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Nota
El esquema que recibe la función debe coincidir con el esquema de la tabla de destino.
Agregue la directiva de actualización a la tabla de destino. Esta directiva ejecuta automáticamente la consulta en todos los datos recién ingeridos en la tabla intermedia RawEvents e ingerirá los resultados en la tabla Events. Defina una directiva de retención cero para evitar que se conserve la tabla intermedia.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Revise los datos de la tabla Events.
Cree una función de actualización que expanda la recopilación de records para que cada valor de la recopilación reciba una fila independiente, mediante el operador mv-expand. Se usará la tabla RawEvents como tabla de origen y Events como tabla de destino.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Nota
El esquema que recibe la función debe coincidir con el esquema de la tabla de destino.
Agregue la directiva de actualización a la tabla de destino. Esta directiva ejecuta automáticamente la consulta en todos los datos recién ingeridos en la tabla intermedia RawEvents e ingerirá los resultados en la tabla Events. Defina una directiva de retención cero para evitar que se conserve la tabla intermedia.