Ingesta de datos mediante el SDK de .NET de Kusto
Hay dos bibliotecas cliente para .NET: una biblioteca de ingesta y una biblioteca de datos. Para más información sobre el SDK para .NET, consulte la documentación sobre el SDK para .NET. Estas bibliotecas permiten ingerir (cargar) datos en un clúster y consultar datos desde el código. En este artículo, primero creará una tabla y la asignación de datos en un clúster de prueba. A continuación, pondrá en cola la ingesta en el clúster y validará los resultados.
Requisitos previos
- Una cuenta de Microsoft o una identidad de usuario de Microsoft Entra. No se necesita una suscripción a Azure.
- Un clúster y una base de datos. Cree un clúster y una base de datos.
Instalación de la biblioteca de ingesta
Install-Package Microsoft.Azure.Kusto.Ingest
Adición de autenticación y creación de la cadena de conexión
Autenticación
Para autenticar una aplicación, el SDK usa el identificador de inquilino de Microsoft Entra. Para buscar el identificador de inquilino, use la dirección URL siguiente, sustituyendo su dominio por SuDominio.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Por ejemplo, si el nombre de dominio es contoso.com, la dirección URL es: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Haga clic en esta dirección URL para ver los resultados. la primera línea es como sigue.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
En este caso es el id. de inquilino es 6babcaad-604b-40ac-a9d7-9fd97c0b779f
.
En este ejemplo, se utiliza una autenticación de usuario de Microsoft Entra interactiva para acceder al clúster. También puede usar la autenticación de la aplicación Microsoft Entra con un certificado o secreto de la aplicación. Asegúrese de establecer los valores de tenantId
y clusterUri
antes de ejecutar este código.
El SDK es una manera cómoda de configurar el método de autenticación como parte de la cadena de conexión. Para ver la documentación completa sobre las cadenas de conexión, consulte Cadenas de conexión.
Nota:
La versión actual del SDK no admite la autenticación interactiva de usuarios en .NET Core. Si es necesario, use en su lugar el nombre de usuario o la contraseña de Microsoft Entra o la autenticación de la aplicación.
Creación de la cadena de conexión
Ahora puede construir la cadena de conexión. Creará la tabla de destino y la asignación en un paso posterior.
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);
Definición de la información del archivo de origen
Establezca la ruta de acceso del archivo de origen. Este ejemplo utiliza un archivo de ejemplo hospedado en Azure Blob Storage. El conjunto de datos de ejemplo de StormEvents contiene datos relacionados con el tiempo de los National Centers for Environmental Information.
var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";
Creación de una tabla en el clúster de prueba
Cree una tabla que denominada StormEvents
que coincida con el esquema de los datos del archivo StormEvents.csv
.
Sugerencia
Los fragmentos de código siguientes crean una instancia de un cliente para casi todas las llamadas. Esto se hace para que cada fragmento de código se pueda ejecutar de forma individual. En producción, las instancias de cliente son de reentrada y deben mantenerse todo el tiempo que sea necesario. Una única instancia de cliente por URI es suficiente, aunque se trabaje con varias bases de datos (la base de datos se puede especificar en el nivel de comando).
var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Definición de la asignación de ingesta
Asigna los datos de CSV entrantes a los nombres de columna utilizados al crear la tabla. Aprovisione un objeto de asignación de columnas de CSV en esa tabla.
var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Definición de la directiva de procesamiento por lotes para la tabla
El procesamiento por lotes de datos entrantes optimiza el tamaño de la partición de datos, que está controlado por la directiva de procesamiento por lotes de ingesta. Modifique la directiva con el comando de administración de directivas de procesamiento por lotes de ingesta. Use esta directiva para reducir la latencia de los datos que llegan lentamente.
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
databaseName,
tableName,
new IngestionBatchingPolicy(
maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
maximumNumberOfItems: 100,
maximumRawDataSizeMB: 1024
)
);
kustoClient.ExecuteControlCommand(command);
}
Se recomienda definir un valor Raw Data Size
para los datos ingeridos y reducir de manera incremental el tamaño a 250 MB, a la vez que se comprueba si mejora el rendimiento.
Puede usar la propiedad Flush Immediately
para omitir el procesamiento por lotes, aunque no se recomienda para la ingesta a gran escala, ya que puede provocar un rendimiento deficiente.
Colocación de un mensaje en cola para la ingesta
Coloca en cola un mensaje para extraer datos desde Blob Storage e ingerirlos. Se establece una conexión al clúster de ingesta y se crea otro cliente para trabajar con ese punto de conexión.
Sugerencia
Los fragmentos de código siguientes crean una instancia de un cliente para casi todas las llamadas. Esto se hace para que cada fragmento de código se pueda ejecutar de forma individual. En producción, las instancias de cliente son de reentrada y deben mantenerse todo el tiempo que sea necesario. Una única instancia de cliente por URI es suficiente, aunque se trabaje con varias bases de datos (la base de datos se puede especificar en el nivel de comando).
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv,
IngestionMapping = new IngestionMapping
{
IngestionMappingReference = tableMappingName,
IngestionMappingKind = IngestionMappingKind.Csv
},
IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Validación de la ingesta de los datos en la tabla
Espere entre cinco y diez minutos para que la ingesta en cola programe la ingesta y cargue los datos en el clúster. A continuación, ejecute el siguiente código para obtener el recuento de registros de la tabla StormEvents
.
using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());
Ejecución de consultas de solución de problemas
Inicie sesión en https://dataexplorer.azure.com y conéctese al clúster Ejecute el siguiente comando en la base de datos para ver si se ha producido algún error de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarlo.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Ejecute el siguiente comando para ver el estado de todas las operaciones de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarlo.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Limpieza de recursos
Si tiene previsto seguir nuestros otros artículos, conserve los recursos que creó. De lo contrario, ejecute el siguiente comando en la base de datos para limpiar la tabla StormEvents
.
.drop table StormEvents