Compartir vía


Creación de una conexión de datos de Event Hubs para el Explorador de datos de Azure Synapse mediante C# (Versión preliminar)

Azure Synapse Data Explorer es un servicio de exploración de datos rápido y altamente escalable para datos de telemetría y de registro. Azure Synapse Data Explorer permite ingerir (cargar) datos procedentes de Event Hubs, IoT Hubs y blobs escritos en contenedores de blobs.

En este artículo, creará una conexión de datos de Event Hubs para el Explorador de datos de Azure Synapse mediante C#.

Requisitos previos

  • Suscripción a Azure. Cree una cuenta de Azure gratuita.

  • Creación de un grupo de Data Explorer mediante Synapse Studio o Azure Portal

  • Cree una base de datos de Data Explorer.

    1. En Synapse Studio, en el panel izquierdo, seleccione Datos.

    2. Seleccione + (Agregar un recurso nuevo) >Grupo de explorador de datos, y use la siguiente información:

      Configuración Valor sugerido Descripción
      Nombre del grupo contosodataexplorer Nombre del grupo de Data Explorer que se usará.
      Name TestDatabase El nombre de la base de datos debe ser único dentro del clúster.
      Período de retención predeterminado 365 El intervalo de tiempo (en días) para el que se garantiza que los datos se mantengan disponibles para consultarlos. El intervalo de tiempo se mide desde el momento en que se ingieren los datos.
      Período de caché predeterminado 31 El intervalo de tiempo (en días) durante el que los datos consultados con frecuencia se van a mantener disponibles en el almacenamiento SSD o en la RAM, en lugar de en el almacenamiento a largo plazo.
    3. Seleccione Crear para crear la base de datos. Normalmente se tarda menos de un minuto.

Nota

La ingesta de datos de un centro de eventos en grupos de Data Explorer no funcionará si el área de trabajo de Synapse usa una red virtual administrada con la protección de filtración de datos habilitada.

Creación de una tabla en el clúster de prueba

Cree una tabla que denominada StormEvents que coincida con el esquema de los datos del archivo StormEvents.csv.

Sugerencia

Los fragmentos de código siguientes crean una instancia de un cliente para casi todas las llamadas. Esto se hace para que cada fragmento de código se pueda ejecutar de forma individual. En producción, las instancias de cliente son de reentrada y deben mantenerse todo el tiempo que sea necesario. Una única instancia de cliente por URI es suficiente, aunque se trabaje con varias bases de datos (la base de datos se puede especificar en el nivel de comando).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definición de la asignación de ingesta

Asigna los datos de CSV entrantes a los nombres de columna utilizados al crear la tabla. Aprovisione un objeto de asignación de columnas de CSV en esa tabla.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Instalación de NuGet C#

Autenticación

Para ejecutar el ejemplo siguiente, necesita una aplicación y una entidad de servicio de Microsoft Entra que puedan acceder a los recursos. Para crear una aplicación de Microsoft Entra gratuita y agregar una asignación de roles en el nivel de la suscripción, consulte Creación de una aplicación de Microsoft Entra. También necesita el identificador de directorio (inquilino), el identificador de aplicación y el secreto de cliente.

Incorporación de una conexión de datos de Event Hubs

En el ejemplo siguiente, se explica cómo se agrega una conexión de datos de Event Hubs mediante programación. Consulte conectarse a Event Hubs para obtener información sobre cómo agregar una conexión de datos de Event Hubs mediante Azure Portal.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Configuración Valor sugerido Descripción del campo
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx El identificador de inquilino. También conocido como identificador de directorio.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Identificador de suscripción que se usa para la creación de recursos.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Identificador del cliente de la aplicación que puede acceder a los recursos del inquilino.
clientSecret xxxxxxxxxxxxxx Secreto del cliente de la aplicación que puede acceder a los recursos del inquilino.
resourceGroupName testrg Nombre del grupo de recursos que contiene el clúster.
clusterName mykustocluster Nombre del clúster.
databaseName mykustodatabase Nombre de la base de datos de destino del clúster.
dataConnectionName myeventhubconnect Nombre que desea asignar a la conexión de datos.
tableName StormEvents Nombre de la tabla de destino de la base de datos de destino.
mappingRuleName StormEvents_CSV_Mapping Nombre de la asignación de columnas asociada a la tabla de destino.
dataFormat csv Formato de datos del mensaje.
eventHubResourceId Identificador del recurso Identificador del recurso del centro de eventos que contiene los datos que se van a ingerir.
consumerGroup $Default Grupo de consumidores del centro de eventos.
ubicación Centro de EE. UU. Ubicación del recurso de conexión de datos.
compression Gzip o Ninguno El tipo de compresión de datos.

Generación de datos

Examine la aplicación de ejemplo que genera los datos y los envía a un centro de eventos.

Un evento puede contener uno o más registros, hasta su límite de tamaño. En el ejemplo siguiente se envían dos eventos, cada uno tiene cinco registros anexados:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Limpieza de recursos

Para eliminar la conexión de datos, use el siguiente comando:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Pasos siguientes