Usare Hub eventi e .NET per inviare e ricevere messaggi degli argomenti di Atlas Kafka
Nota
Il Microsoft Purview Data Catalog sta cambiando il nome in Microsoft Purview Unified Catalog. Tutte le funzionalità rimarranno invariate. Verrà visualizzata la modifica del nome quando la nuova esperienza di governance dei dati di Microsoft Purview è disponibile a livello generale nell'area. Controllare il nome nell'area.
Questa guida introduttiva illustra come inviare e ricevere gli eventi degli argomenti di Atlas Kafka . Si userà Hub eventi di Azure e la libreria .NET di Azure.Messaging.EventHubs.
Prerequisiti
Se non si ha familiarita' con Hub eventi, vedere Panoramica di Hub eventi prima di completare questa guida introduttiva.
Per seguire questa guida introduttiva, sono necessari alcuni prerequisiti:
- Sottoscrizione di Microsoft Azure. Per usare i servizi di Azure, inclusi Hub eventi, è necessaria una sottoscrizione di Azure. Se non si dispone di un account Azure, è possibile iscriversi per una versione di valutazione gratuita o usare i vantaggi per gli abbonati MSDN quando si crea un account.
-
Microsoft Visual Studio 2022. La libreria client di Hub eventi usa le nuove funzionalità introdotte in C# 8.0. È comunque possibile usare la libreria con le versioni C# precedenti, ma la nuova sintassi non sarà disponibile. Per usare la sintassi completa, è consigliabile compilare con .NET Core SDK 3.0 o versione successiva e la versione del linguaggio impostata su
latest
. Se si usa una versione di Visual Studio precedente a Visual Studio 2019, non dispone degli strumenti necessari per compilare progetti C# 8.0. Visual Studio 2022, inclusa l'edizione community gratuita, può essere scaricato qui. - Un account Microsoft Purview attivo.
-
Hub eventi configurato con l'account Microsoft Purview per inviare e ricevere messaggi:
- È possibile che l'account sia già configurato. È possibile controllare l'account Microsoft Purview nel portale di Azure in Impostazioni, configurazione Kafka. Se non è già configurato, seguire questa guida.
Pubblicare messaggi in Microsoft Purview
Si creerà un'applicazione console .NET Core che invia eventi a Microsoft Purview tramite l'argomento Kafka di Hub eventi ATLAS_HOOK.
Per pubblicare messaggi in Microsoft Purview, è necessario un hub eventi gestito o almeno un hub eventi con una configurazione hook.
Creare un progetto di Visual Studio
Creare quindi un'applicazione console .NET C# in Visual Studio:
- Avviare Visual Studio.
- Nella finestra Start selezionare Crea una nuovaapp console di progetto> (.NET Framework). È necessaria .NET versione 4.5.2 o successiva.
- In Nome progetto immettere PurviewKafkaProducer.
- Selezionare Crea per creare il progetto.
Creare un'applicazione console
- Avviare Visual Studio 2022.
- Selezionare Crea un nuovo progetto.
- Nella finestra di dialogo Crea un nuovo progetto seguire questa procedura: Se questa finestra di dialogo non viene visualizzata, selezionare File dal menu, selezionare Nuovo e quindi Selezionare Progetto.
- Selezionare C# per il linguaggio di programmazione.
- Selezionare Console per il tipo di applicazione.
- Selezionare App console (.NET Core) dall'elenco dei risultati.
- Selezionare quindi Avanti.
Aggiungere il pacchetto NuGet di Hub eventi
Selezionare Strumenti>NuGet Package ManagerPackage Manager> Console dal menu.
Eseguire il comando seguente per installare il pacchetto NuGet Azure.Messaging.EventHubs e il pacchetto NuGet Azure.Messaging.EventHubs.Producer :
Install-Package Azure.Messaging.EventHubs
Install-Package Azure.Messaging.EventHubs.Producer
Scrivere codice che invia messaggi all'hub eventi
Aggiungere le istruzioni seguenti
using
all'inizio del file Program.cs :using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer;
Aggiungere costanti alla
Program
classe per il nome di Hub eventi stringa di connessione e Hub eventi.private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>";
Sostituire il
Main
metodo con il metodo seguenteasync Main
e aggiungere un oggettoasync ProduceMessage
per eseguire il push dei messaggi in Microsoft Purview. Per informazioni dettagliate, vedere i commenti nel codice.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; / Create an event producer client to add events in the event hub EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName); await ProduceMessage(producer); } static async Task ProduceMessage(EventHubProducerClient producer) { // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add events to the batch. An event is a represented by a collection of bytes and metadata. eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>"))); // Use the producer client to send the batch of events to the event hub await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 3 events has been published."); }
Generare il progetto. Verificare che non vi siano errori.
Eseguire il programma e attendere il messaggio di conferma.
Nota
Per il codice sorgente completo con altri commenti informativi, vedere questo file in GitHub
Codice di esempio che crea una tabella SQL con due colonne usando un messaggio JSON crea entità
{
"msgCreatedBy":"nayenama",
"message":{
"type":"ENTITY_CREATE_V2",
"user":"admin",
"entities":{
"entities":[
{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"temporary":false,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table added via Kafka"
},
"relationshipAttributes":{
"columns":[
{
"guid":"-1102395743156037",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
}
},
{
"guid":"-1102395743156038",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
}
}
]
},
"guid":"-1102395743156036",
"version":0
}
],
"referredEntities":{
"-1102395743156037":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
"precision":23,
"length":8,
"description":"Sales Order ID",
"scale":3,
"name":"OrderID",
"data_type":"int"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156037",
"version":2
},
"-1102395743156038":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
"description":"Sales Order Date",
"scale":3,
"name":"OrderDate",
"data_type":"datetime"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156038",
"status":"ACTIVE",
"createdBy":"ServiceAdmin",
"version":0
}
}
}
},
"version":{
"version":"1.0.0"
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1
}
Ricevere messaggi di Microsoft Purview
Informazioni su come scrivere un'applicazione console .NET Core che riceve messaggi da hub eventi usando un processore di eventi. Il processore di eventi gestisce checkpoint persistenti e ricezioni parallele da hub eventi. Ciò semplifica il processo di ricezione degli eventi. È necessario usare l'hub eventi ATLAS_ENTITIES per ricevere messaggi da Microsoft Purview.
Per ricevere messaggi da Microsoft Purview, sono necessari un hub eventi gestito o una configurazione di notifica di Hub eventi.
Avviso
Event Hubs SDK usa la versione più recente dell'API di archiviazione disponibile. Tale versione potrebbe non essere necessariamente disponibile nella piattaforma Stack Hub. Se si esegue questo codice nell'hub di Azure Stack, si verificheranno errori di runtime a meno che non si abbia come destinazione la versione specifica in uso. Se si usa Archiviazione BLOB di Azure come archivio di checkpoint, esaminare la versione supportata dell'API di archiviazione di Azure per la compilazione dell'hub di Azure Stack e nel codice specificare come destinazione tale versione.
La versione più recente disponibile del servizio di archiviazione è la versione 2019-02-02. Per impostazione predefinita, la libreria client di Hub eventi SDK usa la versione più elevata disponibile in Azure (2019-07-07 al momento della versione dell'SDK). Se si usa l'hub di Azure Stack versione 2005, oltre a seguire i passaggi descritti in questa sezione, sarà anche necessario aggiungere codice destinato all'API del servizio di archiviazione versione 2019-02-02. Per informazioni su come impostare come destinazione una versione specifica dell'API di archiviazione, vedere questo esempio in GitHub.
Creare un archivio di Azure e un contenitore BLOB
Archiviazione di Azure verrà usato come archivio checkpoint. Usare la procedura seguente per creare un account di archiviazione di Azure.
Ottenere il stringa di connessione per l'account di archiviazione
Prendere nota del stringa di connessione e del nome del contenitore. Verranno usati nel codice di ricezione.
Creare un progetto di Visual Studio per il ricevitore
- Nella finestra Esplora soluzioni selezionare e tenere premuto (o fare clic con il pulsante destro del mouse) sulla soluzione EventHubQuickStart, scegliere Aggiungi e selezionare Nuovo progetto.
- Selezionare App console (.NET Core) e quindi Avanti.
- Immettere PurviewKafkaConsumer come nome del progetto e selezionare Crea.
Aggiungere il pacchetto NuGet di Hub eventi
Selezionare Strumenti>NuGet Package ManagerPackage Manager> Console dal menu.
Eseguire il comando seguente per installare il pacchetto NuGet Azure.Messaging.EventHubs :
Install-Package Azure.Messaging.EventHubs
Eseguire il comando seguente per installare il pacchetto NuGet Azure.Messaging.EventHubs.Processor :
Install-Package Azure.Messaging.EventHubs.Processor
Aggiornare il metodo Main
Aggiungere le istruzioni seguenti
using
all'inizio del file Program.cs .using System; using System.Text; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor;
Aggiungere costanti alla
Program
classe per l'stringa di connessione hub eventi e il nome dell'hub eventi. Sostituire i segnaposto tra parentesi quadre con i valori reali ottenuti quando è stato creato l'hub eventi e l'account di archiviazione (chiavi di accesso - stringa di connessione primaria). Assicurarsi che sia{Event Hubs namespace connection string}
il stringa di connessione a livello di spazio dei nomi e non la stringa dell'hub eventi.private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>"; private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>"; private const string blobContainerName = "<BLOB CONTAINER NAME>";
Usare ATLAS_ENTITIES come nome dell'hub eventi quando si inviano messaggi a Microsoft Purview.
Sostituire il
Main
metodo con il metodo seguenteasync Main
. Per informazioni dettagliate, vedere i commenti nel codice.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 10 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(10)); // Stop the processing await processor.StopProcessingAsync(); }
Aggiungere ora alla classe i seguenti metodi di gestione degli eventi e degli errori.
static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Write the body of the event to the console window Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray())); // Update checkpoint in the blob storage so that the app receives only new events the next time it's run await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Generare il progetto. Verificare che non vi siano errori.
Nota
Per il codice sorgente completo con altri commenti informativi, vedere questo file in GitHub.
Eseguire l'applicazione ricevitore.
Esempio di messaggio ricevuto da Microsoft Purview
{
"version":
{"version":"1.0.0",
"versionParts":[1]
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1,
"msgSourceIP":"10.244.155.5",
"msgCreatedBy":
"",
"msgCreationTime":1618588940869,
"message":{
"type":"ENTITY_NOTIFICATION_V2",
"entity":{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"createTime":0,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table"
},
"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
"status":"ACTIVE",
"displayText":"SalesOrderTable"
},
"operationType":"ENTITY_UPDATE",
"eventTime":1618588940567
}
}
Passaggi successivi
Vedere altri esempi in GitHub.