Partager via


Créer une connexion de données Event Hubs pour l’explorateur de données Azure Synapse à l’aide de C\# (préversion)

Azure Synapse Data Explorer est un service d’exploration de données rapide et hautement scalable pour les données des journaux et les données de télémétrie. Azure SynapseData Explorer offre une ingestion (chargement de données) à partir de hubs d’événements, de hubs IoT et d’objets blob écrits dans des conteneurs d’objets blob.

Dans cet article, vous créez une connexion de données Event Hubs pour l’explorateur de données Azure Synapse à l’aide de C\#.

Prérequis

  • Un abonnement Azure. Créez un compte Azure gratuit.

  • Créez un pool Data Explorer en utilisant Synapse Studio ou le portail Azure

  • Créez une base de données Data Explorer.

    1. Dans Synapse Studio, dans le volet de gauche, sélectionnez Données.

    2. Sélectionnez +(Ajouter une nouvelle ressource) >Pool Data Explorer et utilisez les informations suivantes :

      Paramètre Valeur suggérée Description
      Nom du pool contosodataexplorer Nom du pool Data Explorer à utiliser
      Nom TestDatabase Ce nom de base de données doit être unique dans le cluster.
      Période de conservation par défaut 365 Intervalle de temps (en jours) pendant lequel vous avez la garantie d’avoir les données à disposition pour les interroger. Cet intervalle se mesure à partir du moment où les données sont ingérées.
      Période de cache par défaut 31 Intervalle de temps (en jours) pendant lequel les données fréquemment interrogées restent disponibles dans le stockage SSD ou la RAM, plutôt que dans un stockage à plus long terme.
    3. Sélectionnez Créer pour créer la base de données. La création prend généralement moins d’une minute.

Notes

L’ingestion de données à partir d’Event Hub dans des pools de l’Explorateur de données ne fonctionne pas si votre espace de travail Synapse utilise un réseau virtuel managé avec la protection contre l’exfiltration des données activée.

Créer une table sur votre cluster de test

Créez une table nommée StormEvents qui correspond au schéma des données dans le fichier StormEvents.csv.

Conseil

Les extraits de code suivants créent une instance d’un client pour presque tous les appels. Cela permet de rendre chaque extrait de code exécutable individuellement. En production, les instances de client sont réentrantes et doivent être conservées aussi longtemps que nécessaire. Une seule instance de client par URI suffit, même quand vous utilisez plusieurs bases de données (la base de données peut être spécifiée au niveau de la commande).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Définir le mappage d’ingestion

Mappez les données CSV entrantes aux noms de colonnes utilisés lors de la création de la table. Provisionnez un objet de mappage de colonne CSV sur cette table.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Installer le package NuGet C#

Authentification

Pour exécuter l’exemple suivant, vous avez besoin d’une application Microsoft Entra et d’un principal de service ayant accès aux ressources. Pour créer une application Microsoft Entra gratuite et ajouter une attribution de rôle au niveau de l’abonnement, consultez Créer une application Microsoft Entra. L’ID de répertoire (locataire), l’ID d’application et la clé secrète client sont également nécessaires.

Ajouter une connexion de données Event Hubs

L’exemple suivant montre comment ajouter par programmation une connexion de données Event Hubs. Consultez Se connecter à Event Hubs pour plus d’informations sur l’ajout d’une connexion de données Event Hubs à l’aide du portail Azure.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Paramètre Valeur suggérée Description du champ
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Votre ID de client. Également appelé ID de répertoire.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID d’abonnement que vous utilisez pour la création de ressources.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID client de l’application qui peut accéder aux ressources figurant dans votre locataire.
clientSecret xxxxxxxxxxxxxx Secret client de l’application qui peut accéder aux ressources figurant dans votre locataire.
resourceGroupName testrg Nom du groupe de ressources qui contient votre cluster.
clusterName mykustocluster Nom de votre cluster.
databaseName mykustodatabase Nom de la base de données cible dans votre cluster.
dataConnectionName myeventhubconnect Nom souhaité de votre connexion de données.
tableName StormEvents Nom de la table cible dans la base de données cible.
mappingRuleName StormEvents_CSV_Mapping Nom de votre mappage de colonnes associé à la table cible.
dataFormat csv Format de données du message.
eventHubResourceId ID de ressource ID de ressource de votre hub d’événements qui contient les données pour l’ingestion.
consumerGroup $Default Groupe de consommateurs de votre hub d’événements.
location USA Centre Emplacement de la ressource de connexion de données.
compression Gzip ou Aucune Type de compression de données.

Générer les données

Consultez l’exemple d’application qui génère des données et les envoie à un hub d’événements.

Un événement peut contenir un ou plusieurs enregistrements, jusqu’à sa limite de taille. Dans l’exemple suivant, nous envoyons deux événements, chacun avec cinq enregistrements ajoutés :

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Nettoyer les ressources

Pour supprimer la connexion de données, utilisez la commande suivante :

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Étapes suivantes