Creación de una conexión de datos de Event Hubs para el Explorador de datos de Azure Synapse mediante C# (Versión preliminar)
Azure Synapse Data Explorer es un servicio de exploración de datos rápido y altamente escalable para datos de telemetría y de registro. Azure Synapse Data Explorer permite ingerir (cargar) datos procedentes de Event Hubs, IoT Hubs y blobs escritos en contenedores de blobs.
En este artículo, creará una conexión de datos de Event Hubs para el Explorador de datos de Azure Synapse mediante C#.
Requisitos previos
Suscripción a Azure. Cree una cuenta de Azure gratuita.
Creación de un grupo de Data Explorer mediante Synapse Studio o Azure Portal
Cree una base de datos de Data Explorer.
En Synapse Studio, en el panel izquierdo, seleccione Datos.
Seleccione + (Agregar un recurso nuevo) >Grupo de explorador de datos, y use la siguiente información:
Configuración Valor sugerido Descripción Nombre del grupo contosodataexplorer Nombre del grupo de Data Explorer que se usará. Name TestDatabase El nombre de la base de datos debe ser único dentro del clúster. Período de retención predeterminado 365 El intervalo de tiempo (en días) para el que se garantiza que los datos se mantengan disponibles para consultarlos. El intervalo de tiempo se mide desde el momento en que se ingieren los datos. Período de caché predeterminado 31 El intervalo de tiempo (en días) durante el que los datos consultados con frecuencia se van a mantener disponibles en el almacenamiento SSD o en la RAM, en lugar de en el almacenamiento a largo plazo. Seleccione Crear para crear la base de datos. Normalmente se tarda menos de un minuto.
Nota
La ingesta de datos de un centro de eventos en grupos de Data Explorer no funcionará si el área de trabajo de Synapse usa una red virtual administrada con la protección de filtración de datos habilitada.
- Visual Studio 2019: descargue y use Visual Studio 2019 Community Edition, que es una edición gratuita. Habilite Desarrollo de Azure durante la instalación de Visual Studio.
Creación de una tabla en el clúster de prueba
Cree una tabla que denominada StormEvents
que coincida con el esquema de los datos del archivo StormEvents.csv
.
Sugerencia
Los fragmentos de código siguientes crean una instancia de un cliente para casi todas las llamadas. Esto se hace para que cada fragmento de código se pueda ejecutar de forma individual. En producción, las instancias de cliente son de reentrada y deben mantenerse todo el tiempo que sea necesario. Una única instancia de cliente por URI es suficiente, aunque se trabaje con varias bases de datos (la base de datos se puede especificar en el nivel de comando).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definición de la asignación de ingesta
Asigna los datos de CSV entrantes a los nombres de columna utilizados al crear la tabla. Aprovisione un objeto de asignación de columnas de CSV en esa tabla.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Instalación de NuGet C#
- Instale el paquete Microsoft.Azure.Management.Kusto NuGet.
Autenticación
Para ejecutar el ejemplo siguiente, necesita una aplicación y una entidad de servicio de Microsoft Entra que puedan acceder a los recursos. Para crear una aplicación de Microsoft Entra gratuita y agregar una asignación de roles en el nivel de la suscripción, consulte Creación de una aplicación de Microsoft Entra. También necesita el identificador de directorio (inquilino), el identificador de aplicación y el secreto de cliente.
Incorporación de una conexión de datos de Event Hubs
En el ejemplo siguiente, se explica cómo se agrega una conexión de datos de Event Hubs mediante programación. Consulte conectarse a Event Hubs para obtener información sobre cómo agregar una conexión de datos de Event Hubs mediante Azure Portal.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Configuración | Valor sugerido | Descripción del campo |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | El identificador de inquilino. También conocido como identificador de directorio. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Identificador de suscripción que se usa para la creación de recursos. |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Identificador del cliente de la aplicación que puede acceder a los recursos del inquilino. |
clientSecret | xxxxxxxxxxxxxx | Secreto del cliente de la aplicación que puede acceder a los recursos del inquilino. |
resourceGroupName | testrg | Nombre del grupo de recursos que contiene el clúster. |
clusterName | mykustocluster | Nombre del clúster. |
databaseName | mykustodatabase | Nombre de la base de datos de destino del clúster. |
dataConnectionName | myeventhubconnect | Nombre que desea asignar a la conexión de datos. |
tableName | StormEvents | Nombre de la tabla de destino de la base de datos de destino. |
mappingRuleName | StormEvents_CSV_Mapping | Nombre de la asignación de columnas asociada a la tabla de destino. |
dataFormat | csv | Formato de datos del mensaje. |
eventHubResourceId | Identificador del recurso | Identificador del recurso del centro de eventos que contiene los datos que se van a ingerir. |
consumerGroup | $Default | Grupo de consumidores del centro de eventos. |
ubicación | Centro de EE. UU. | Ubicación del recurso de conexión de datos. |
compression | Gzip o Ninguno | El tipo de compresión de datos. |
Generación de datos
Examine la aplicación de ejemplo que genera los datos y los envía a un centro de eventos.
Un evento puede contener uno o más registros, hasta su límite de tamaño. En el ejemplo siguiente se envían dos eventos, cada uno tiene cinco registros anexados:
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
Limpieza de recursos
Para eliminar la conexión de datos, use el siguiente comando:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);