Créer une connexion de données Event Hubs pour l’explorateur de données Azure Synapse à l’aide de C\# (préversion)
Azure Synapse Data Explorer est un service d’exploration de données rapide et hautement scalable pour les données des journaux et les données de télémétrie. Azure SynapseData Explorer offre une ingestion (chargement de données) à partir de hubs d’événements, de hubs IoT et d’objets blob écrits dans des conteneurs d’objets blob.
Dans cet article, vous créez une connexion de données Event Hubs pour l’explorateur de données Azure Synapse à l’aide de C\#.
Prérequis
Un abonnement Azure. Créez un compte Azure gratuit.
Créez un pool Data Explorer en utilisant Synapse Studio ou le portail Azure
Créez une base de données Data Explorer.
Dans Synapse Studio, dans le volet de gauche, sélectionnez Données.
Sélectionnez +(Ajouter une nouvelle ressource) >Pool Data Explorer et utilisez les informations suivantes :
Paramètre Valeur suggérée Description Nom du pool contosodataexplorer Nom du pool Data Explorer à utiliser Nom TestDatabase Ce nom de base de données doit être unique dans le cluster. Période de conservation par défaut 365 Intervalle de temps (en jours) pendant lequel vous avez la garantie d’avoir les données à disposition pour les interroger. Cet intervalle se mesure à partir du moment où les données sont ingérées. Période de cache par défaut 31 Intervalle de temps (en jours) pendant lequel les données fréquemment interrogées restent disponibles dans le stockage SSD ou la RAM, plutôt que dans un stockage à plus long terme. Sélectionnez Créer pour créer la base de données. La création prend généralement moins d’une minute.
Notes
L’ingestion de données à partir d’Event Hub dans des pools de l’Explorateur de données ne fonctionne pas si votre espace de travail Synapse utilise un réseau virtuel managé avec la protection contre l’exfiltration des données activée.
- Visual Studio 2019. Téléchargez et utilisez gratuitement Visual Studio 2019 Community Edition. Activez le développement Azure lors de l’installation de Visual Studio.
Créer une table sur votre cluster de test
Créez une table nommée StormEvents
qui correspond au schéma des données dans le fichier StormEvents.csv
.
Conseil
Les extraits de code suivants créent une instance d’un client pour presque tous les appels. Cela permet de rendre chaque extrait de code exécutable individuellement. En production, les instances de client sont réentrantes et doivent être conservées aussi longtemps que nécessaire. Une seule instance de client par URI suffit, même quand vous utilisez plusieurs bases de données (la base de données peut être spécifiée au niveau de la commande).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Définir le mappage d’ingestion
Mappez les données CSV entrantes aux noms de colonnes utilisés lors de la création de la table. Provisionnez un objet de mappage de colonne CSV sur cette table.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Installer le package NuGet C#
- Installez le package NuGet Microsoft.Azure.Management.Kusto.
Authentification
Pour exécuter l’exemple suivant, vous avez besoin d’une application Microsoft Entra et d’un principal de service ayant accès aux ressources. Pour créer une application Microsoft Entra gratuite et ajouter une attribution de rôle au niveau de l’abonnement, consultez Créer une application Microsoft Entra. L’ID de répertoire (locataire), l’ID d’application et la clé secrète client sont également nécessaires.
Ajouter une connexion de données Event Hubs
L’exemple suivant montre comment ajouter par programmation une connexion de données Event Hubs. Consultez Se connecter à Event Hubs pour plus d’informations sur l’ajout d’une connexion de données Event Hubs à l’aide du portail Azure.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Paramètre | Valeur suggérée | Description du champ |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Votre ID de client. Également appelé ID de répertoire. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID d’abonnement que vous utilisez pour la création de ressources. |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID client de l’application qui peut accéder aux ressources figurant dans votre locataire. |
clientSecret | xxxxxxxxxxxxxx | Secret client de l’application qui peut accéder aux ressources figurant dans votre locataire. |
resourceGroupName | testrg | Nom du groupe de ressources qui contient votre cluster. |
clusterName | mykustocluster | Nom de votre cluster. |
databaseName | mykustodatabase | Nom de la base de données cible dans votre cluster. |
dataConnectionName | myeventhubconnect | Nom souhaité de votre connexion de données. |
tableName | StormEvents | Nom de la table cible dans la base de données cible. |
mappingRuleName | StormEvents_CSV_Mapping | Nom de votre mappage de colonnes associé à la table cible. |
dataFormat | csv | Format de données du message. |
eventHubResourceId | ID de ressource | ID de ressource de votre hub d’événements qui contient les données pour l’ingestion. |
consumerGroup | $Default | Groupe de consommateurs de votre hub d’événements. |
location | USA Centre | Emplacement de la ressource de connexion de données. |
compression | Gzip ou Aucune | Type de compression de données. |
Générer les données
Consultez l’exemple d’application qui génère des données et les envoie à un hub d’événements.
Un événement peut contenir un ou plusieurs enregistrements, jusqu’à sa limite de taille. Dans l’exemple suivant, nous envoyons deux événements, chacun avec cinq enregistrements ajoutés :
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
Nettoyer les ressources
Pour supprimer la connexion de données, utilisez la commande suivante :
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);