Utiliser Event Hubs et .NET pour envoyer et recevoir des messages de rubriques Atlas Kafka
Ce guide de démarrage rapide vous apprend à envoyer et à recevoir des événements de rubriques Atlas Kafka . Nous allons utiliser Azure Event Hubs et la bibliothèque .NET Azure.Messaging.EventHubs.
Configuration requise
Si vous débutez avec Event Hubs, consultez Vue d’ensemble d’Event Hubs avant de terminer ce guide de démarrage rapide.
Pour suivre ce guide de démarrage rapide, vous devez disposer de certaines conditions préalables :
- Un abonnement Microsoft Azure. Pour utiliser les services Azure, y compris Event Hubs, vous avez besoin d’un abonnement Azure. Si vous n’avez pas de compte Azure, vous pouvez vous inscrire à un essai gratuit ou utiliser les avantages de votre abonné MSDN lorsque vous créez un compte.
-
Microsoft Visual Studio 2022. La bibliothèque de client Event Hubs utilise les nouvelles fonctionnalités introduites dans C# 8.0. Vous pouvez toujours utiliser la bibliothèque avec les versions C# précédentes, mais la nouvelle syntaxe ne sera pas disponible. Pour utiliser la syntaxe complète, il est recommandé de compiler avec le SDK .NET Core 3.0 ou version ultérieure et la version du langage définie sur
latest
. Si vous utilisez une version de Visual Studio antérieure à Visual Studio 2019, elle ne dispose pas des outils nécessaires pour générer des projets C# 8.0. Visual Studio 2022, y compris l’édition Community gratuite, peut être téléchargé ici. - Un compte Microsoft Purview actif.
-
Un Hubs d’événements configuré avec votre compte Microsoft Purview pour envoyer et recevoir des messages :
- Votre compte est peut-être déjà configuré. Vous pouvez case activée votre compte Microsoft Purview dans le Portail Azure sous Paramètres, configuration Kafka. S’il n’est pas déjà configuré, suivez ce guide.
Publier des messages sur Microsoft Purview
Nous allons créer une application console .NET Core qui envoie des événements à Microsoft Purview via la rubrique Kafka Event Hubs , ATLAS_HOOK.
Pour publier des messages sur Microsoft Purview, vous aurez besoin d’un Event Hubs managé ou d’au moins un Event Hubs avec une configuration de hook.
Créer un projet Visual Studio
Ensuite, créez une application console C# .NET dans Visual Studio :
- Lancez Visual Studio.
- Dans la fenêtre Démarrer, sélectionnez Créer une application console de projet>(.NET Framework). .NET version 4.5.2 ou ultérieure est obligatoire.
- Dans Nom du projet, entrez PurviewKafkaProducer.
- Sélectionnez Créer pour créer le projet.
Créer une application console
- Démarrez Visual Studio 2022.
- Sélectionnez Créer un projet.
- Dans la boîte de dialogue Créer un projet , procédez comme suit : Si cette boîte de dialogue ne s’affiche pas, sélectionnez Fichier dans le menu, sélectionnez Nouveau, puis Projet.
- Sélectionnez C# pour le langage de programmation.
- Sélectionnez Console pour le type de l’application.
- Sélectionnez Application console (.NET Core) dans la liste des résultats.
- Puis sélectionnez Suivant.
Ajouter le package NuGet Event Hubs
Sélectionnez Outils>NuGet Package ManagerConsole du gestionnaire > depackage dans le menu.
Exécutez la commande suivante pour installer le package NuGet Azure.Messaging.EventHubs et le package NuGet Azure.Messaging.EventHubs.Producer :
Install-Package Azure.Messaging.EventHubs
Install-Package Azure.Messaging.EventHubs.Producer
Écrire du code qui envoie des messages au hub d’événements
Ajoutez les instructions suivantes
using
en haut du fichier Program.cs :using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer;
Ajoutez des constantes à la
Program
classe pour le chaîne de connexion Event Hubs et le nom Event Hubs.private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>";
Remplacez la
Main
méthode par la méthode suivanteasync Main
et ajoutez unasync ProduceMessage
pour envoyer des messages dans Microsoft Purview. Pour plus d’informations, consultez les commentaires dans le code.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; / Create an event producer client to add events in the event hub EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName); await ProduceMessage(producer); } static async Task ProduceMessage(EventHubProducerClient producer) { // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add events to the batch. An event is a represented by a collection of bytes and metadata. eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>"))); // Use the producer client to send the batch of events to the event hub await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 3 events has been published."); }
Créez le projet. Assurez-vous qu'il n'y a pas d'erreurs.
Exécutez le programme et attendez le message de confirmation.
Remarque
Pour obtenir le code source complet avec des commentaires plus détaillés, consultez ce fichier dans GitHub
Exemple de code qui crée une table SQL avec deux colonnes à l’aide d’un message JSON Créer une entité
{
"msgCreatedBy":"nayenama",
"message":{
"type":"ENTITY_CREATE_V2",
"user":"admin",
"entities":{
"entities":[
{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"temporary":false,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table added via Kafka"
},
"relationshipAttributes":{
"columns":[
{
"guid":"-1102395743156037",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
}
},
{
"guid":"-1102395743156038",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
}
}
]
},
"guid":"-1102395743156036",
"version":0
}
],
"referredEntities":{
"-1102395743156037":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
"precision":23,
"length":8,
"description":"Sales Order ID",
"scale":3,
"name":"OrderID",
"data_type":"int"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156037",
"version":2
},
"-1102395743156038":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
"description":"Sales Order Date",
"scale":3,
"name":"OrderDate",
"data_type":"datetime"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156038",
"status":"ACTIVE",
"createdBy":"ServiceAdmin",
"version":0
}
}
}
},
"version":{
"version":"1.0.0"
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1
}
Recevoir des messages Microsoft Purview
Découvrez ensuite comment écrire une application console .NET Core qui reçoit des messages d’Event Hubs à l’aide d’un processeur d’événements. Le processeur d’événements gère les points de contrôle persistants et les réceptions parallèles à partir d’Event Hubs. Cela simplifie le processus de réception des événements. Vous devez utiliser le hub d’événements ATLAS_ENTITIES pour recevoir des messages de Microsoft Purview.
Pour recevoir des messages de Microsoft Purview, vous avez besoin d’un Event Hubs managé ou d’une configuration de notification Event Hubs.
Avertissement
Le Kit de développement logiciel (SDK) Event Hubs utilise la version la plus récente de l’API de stockage disponible. Cette version n’est pas nécessairement disponible sur votre plateforme Stack Hub. Si vous exécutez ce code sur Azure Stack Hub, vous rencontrerez des erreurs d’exécution, sauf si vous ciblez la version spécifique que vous utilisez. Si vous utilisez Stockage Blob Azure comme magasin de points de contrôle, passez en revue la version de l’API Stockage Azure prise en charge pour votre build Azure Stack Hub et, dans votre code, ciblez cette version.
La version la plus élevée disponible du service de stockage est la version 2019-02-02. Par défaut, la bibliothèque de client du KIT de développement logiciel (SDK) Event Hubs utilise la version la plus élevée disponible sur Azure (2019-07-07 au moment de la publication du SDK). Si vous utilisez Azure Stack Hub version 2005, en plus de suivre les étapes décrites dans cette section, vous devez également ajouter du code qui cible l’API du service de stockage version 2019-02-02. Pour savoir comment cibler une version spécifique de l’API de stockage, consultez cet exemple dans GitHub.
Créer un stockage Azure et un conteneur d’objets blob
Nous allons utiliser stockage Azure comme magasin de points de contrôle. Procédez comme suit pour créer un compte de stockage Azure.
Obtenir les chaîne de connexion pour le compte de stockage
Notez le chaîne de connexion et le nom du conteneur. Vous les utiliserez dans le code de réception.
Créer un projet Visual Studio pour le récepteur
- Dans la fenêtre Explorateur de solutions, sélectionnez la solution EventHubQuickStart et maintenez la touche enfoncée (ou cliquez avec le bouton droit), pointez sur Ajouter, puis sélectionnez Nouveau projet.
- Sélectionnez Application console (.NET Core), puis Suivant.
- Entrez PurviewKafkaConsumer comme nom du projet, puis sélectionnez Créer.
Ajouter le package NuGet Event Hubs
Sélectionnez Outils>NuGet Package ManagerConsole du gestionnaire > depackage dans le menu.
Exécutez la commande suivante pour installer le package NuGet Azure.Messaging.EventHubs :
Install-Package Azure.Messaging.EventHubs
Exécutez la commande suivante pour installer le package NuGet Azure.Messaging.EventHubs.Processor :
Install-Package Azure.Messaging.EventHubs.Processor
Mettre à jour la méthode Main
Ajoutez les instructions suivantes
using
en haut du fichier Program.cs .using System; using System.Text; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor;
Ajoutez des constantes à la
Program
classe pour le chaîne de connexion Event Hubs et le nom du hub d’événements. Remplacez les espaces réservés entre crochets par les valeurs réelles que vous avez obtenues lors de la création du hub d’événements et du compte de stockage (clés d’accès - chaîne de connexion primaire). Assurez-vous que est{Event Hubs namespace connection string}
la chaîne de connexion au niveau de l’espace de noms, et non la chaîne du hub d’événements.private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>"; private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>"; private const string blobContainerName = "<BLOB CONTAINER NAME>";
Utilisez ATLAS_ENTITIES comme nom de hub d’événements lors de l’envoi de messages à Microsoft Purview.
Remplacez la
Main
méthode par la méthode suivanteasync Main
. Pour plus d’informations, consultez les commentaires dans le code.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 10 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(10)); // Stop the processing await processor.StopProcessingAsync(); }
Ajoutez maintenant les méthodes de gestionnaire d’événements et d’erreurs suivantes à la classe .
static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Write the body of the event to the console window Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray())); // Update checkpoint in the blob storage so that the app receives only new events the next time it's run await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Créez le projet. Assurez-vous qu'il n'y a pas d'erreurs.
Remarque
Pour obtenir le code source complet avec des commentaires plus détaillés, consultez ce fichier sur GitHub.
Exécutez l’application réceptrice.
Exemple de message reçu de Microsoft Purview
{
"version":
{"version":"1.0.0",
"versionParts":[1]
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1,
"msgSourceIP":"10.244.155.5",
"msgCreatedBy":
"",
"msgCreationTime":1618588940869,
"message":{
"type":"ENTITY_NOTIFICATION_V2",
"entity":{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"createTime":0,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table"
},
"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
"status":"ACTIVE",
"displayText":"SalesOrderTable"
},
"operationType":"ENTITY_UPDATE",
"eventTime":1618588940567
}
}
Étapes suivantes
Consultez d’autres exemples dans GitHub.