Condividi tramite


Usare Hub eventi e .NET per inviare e ricevere messaggi degli argomenti di Atlas Kafka

Questa guida introduttiva illustra come inviare e ricevere gli eventi degli argomenti di Atlas Kafka . Si userà Hub eventi di Azure e la libreria .NET di Azure.Messaging.EventHubs.

Prerequisiti

Se non si ha familiarita' con Hub eventi, vedere Panoramica di Hub eventi prima di completare questa guida introduttiva.

Per seguire questa guida introduttiva, sono necessari alcuni prerequisiti:

  • Sottoscrizione di Microsoft Azure. Per usare i servizi di Azure, inclusi Hub eventi, è necessaria una sottoscrizione di Azure. Se non si dispone di un account Azure, è possibile iscriversi per una versione di valutazione gratuita o usare i vantaggi per gli abbonati MSDN quando si crea un account.
  • Microsoft Visual Studio 2022. La libreria client di Hub eventi usa le nuove funzionalità introdotte in C# 8.0. È comunque possibile usare la libreria con le versioni C# precedenti, ma la nuova sintassi non sarà disponibile. Per usare la sintassi completa, è consigliabile compilare con .NET Core SDK 3.0 o versione successiva e la versione del linguaggio impostata su latest. Se si usa una versione di Visual Studio precedente a Visual Studio 2019, non dispone degli strumenti necessari per compilare progetti C# 8.0. Visual Studio 2022, inclusa l'edizione community gratuita, può essere scaricato qui.
  • Un account Microsoft Purview attivo.
  • Hub eventi configurato con l'account Microsoft Purview per inviare e ricevere messaggi:
    • È possibile che l'account sia già configurato. È possibile controllare l'account Microsoft Purview nel portale di Azure in Impostazioni, configurazione Kafka. Se non è già configurato, seguire questa guida.

Pubblicare messaggi in Microsoft Purview

Si creerà un'applicazione console .NET Core che invia eventi a Microsoft Purview tramite l'argomento Kafka di Hub eventi ATLAS_HOOK.

Per pubblicare messaggi in Microsoft Purview, è necessario un hub eventi gestito o almeno un hub eventi con una configurazione hook.

Creare un progetto di Visual Studio

Creare quindi un'applicazione console .NET C# in Visual Studio:

  1. Avviare Visual Studio.
  2. Nella finestra Start selezionare Crea una nuovaapp console di progetto> (.NET Framework). È necessaria .NET versione 4.5.2 o successiva.
  3. In Nome progetto immettere PurviewKafkaProducer.
  4. Selezionare Crea per creare il progetto.

Creare un'applicazione console

  1. Avviare Visual Studio 2022.
  2. Selezionare Crea un nuovo progetto.
  3. Nella finestra di dialogo Crea un nuovo progetto seguire questa procedura: Se questa finestra di dialogo non viene visualizzata, selezionare File dal menu, selezionare Nuovo e quindi Selezionare Progetto.
    1. Selezionare C# per il linguaggio di programmazione.
    2. Selezionare Console per il tipo di applicazione.
    3. Selezionare App console (.NET Core) dall'elenco dei risultati.
    4. Selezionare quindi Avanti.

Aggiungere il pacchetto NuGet di Hub eventi

  1. Selezionare Strumenti>NuGet Package ManagerPackage Manager> Console dal menu.

  2. Eseguire il comando seguente per installare il pacchetto NuGet Azure.Messaging.EventHubs e il pacchetto NuGet Azure.Messaging.EventHubs.Producer :

    Install-Package Azure.Messaging.EventHubs
    
    Install-Package Azure.Messaging.EventHubs.Producer
    

Scrivere codice che invia messaggi all'hub eventi

  1. Aggiungere le istruzioni seguenti using all'inizio del file Program.cs :

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. Aggiungere costanti alla Program classe per il nome di Hub eventi stringa di connessione e Hub eventi.

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. Sostituire il Main metodo con il metodo seguente async Main e aggiungere un oggetto async ProduceMessage per eseguire il push dei messaggi in Microsoft Purview. Per informazioni dettagliate, vedere i commenti nel codice.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
     		/ Create an event producer client to add events in the event hub
            EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
    
     		await ProduceMessage(producer);
        }
    
     	static async Task ProduceMessage(EventHubProducerClient producer)
    
        {
     		// Create a batch of events 
     		using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
     		// Add events to the batch. An event is a represented by a collection of bytes and metadata. 
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
    
     		// Use the producer client to send the batch of events to the event hub
     		await producerClient.SendAsync(eventBatch);
     		Console.WriteLine("A batch of 3 events has been published.");
    
     	}
    
  4. Generare il progetto. Verificare che non vi siano errori.

  5. Eseguire il programma e attendere il messaggio di conferma.

    Nota

    Per il codice sorgente completo con altri commenti informativi, vedere questo file in GitHub

Codice di esempio che crea una tabella SQL con due colonne usando un messaggio JSON crea entità

	
	{
    "msgCreatedBy":"nayenama",
    "message":{
        "type":"ENTITY_CREATE_V2",
        "user":"admin",
        "entities":{
            "entities":[
                {
                    "typeName":"azure_sql_table",
                    "attributes":{
                        "owner":"admin",
                        "temporary":false,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
                        "name":"SalesOrderTable",
                        "description":"Sales Order Table added via Kafka"
                    },
                    "relationshipAttributes":{
                        "columns":[
                            {
                                "guid":"-1102395743156037",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
                                }
                            },
                            {
                                "guid":"-1102395743156038",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
                                }
                            }
                        ]
                    },
                    "guid":"-1102395743156036",
                    "version":0
                }
            ],
            "referredEntities":{
                "-1102395743156037":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
                        "precision":23,
                        "length":8,
                        "description":"Sales Order ID",
                        "scale":3,
                        "name":"OrderID",
                        "data_type":"int"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156037",
                    "version":2
                },
                "-1102395743156038":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
                        "description":"Sales Order Date",
                        "scale":3,
                        "name":"OrderDate",
                        "data_type":"datetime"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156038",
                    "status":"ACTIVE",
                    "createdBy":"ServiceAdmin",
                    "version":0
                }
            }
        }
    },
    "version":{
        "version":"1.0.0"
    },
    "msgCompressionKind":"NONE",
    "msgSplitIdx":1,
    "msgSplitCount":1
}


Ricevere messaggi di Microsoft Purview

Informazioni su come scrivere un'applicazione console .NET Core che riceve messaggi da hub eventi usando un processore di eventi. Il processore di eventi gestisce checkpoint persistenti e ricezioni parallele da hub eventi. Ciò semplifica il processo di ricezione degli eventi. È necessario usare l'hub eventi ATLAS_ENTITIES per ricevere messaggi da Microsoft Purview.

Per ricevere messaggi da Microsoft Purview, sono necessari un hub eventi gestito o una configurazione di notifica di Hub eventi.

Avviso

Event Hubs SDK usa la versione più recente dell'API di archiviazione disponibile. Tale versione potrebbe non essere necessariamente disponibile nella piattaforma Stack Hub. Se si esegue questo codice nell'hub di Azure Stack, si verificheranno errori di runtime a meno che non si abbia come destinazione la versione specifica in uso. Se si usa Archiviazione BLOB di Azure come archivio di checkpoint, esaminare la versione supportata dell'API di archiviazione di Azure per la compilazione dell'hub di Azure Stack e nel codice specificare come destinazione tale versione.

La versione più recente disponibile del servizio di archiviazione è la versione 2019-02-02. Per impostazione predefinita, la libreria client di Hub eventi SDK usa la versione più elevata disponibile in Azure (2019-07-07 al momento della versione dell'SDK). Se si usa l'hub di Azure Stack versione 2005, oltre a seguire i passaggi descritti in questa sezione, sarà anche necessario aggiungere codice destinato all'API del servizio di archiviazione versione 2019-02-02. Per informazioni su come impostare come destinazione una versione specifica dell'API di archiviazione, vedere questo esempio in GitHub.

Creare un archivio di Azure e un contenitore BLOB

Archiviazione di Azure verrà usato come archivio checkpoint. Usare la procedura seguente per creare un account di archiviazione di Azure.

  1. Creare un account di archiviazione di Azure

  2. Creare un contenitore BLOB

  3. Ottenere il stringa di connessione per l'account di archiviazione

    Prendere nota del stringa di connessione e del nome del contenitore. Verranno usati nel codice di ricezione.

Creare un progetto di Visual Studio per il ricevitore

  1. Nella finestra Esplora soluzioni selezionare e tenere premuto (o fare clic con il pulsante destro del mouse) sulla soluzione EventHubQuickStart, scegliere Aggiungi e selezionare Nuovo progetto.
  2. Selezionare App console (.NET Core) e quindi Avanti.
  3. Immettere PurviewKafkaConsumer come nome del progetto e selezionare Crea.

Aggiungere il pacchetto NuGet di Hub eventi

  1. Selezionare Strumenti>NuGet Package ManagerPackage Manager> Console dal menu.

  2. Eseguire il comando seguente per installare il pacchetto NuGet Azure.Messaging.EventHubs :

    Install-Package Azure.Messaging.EventHubs
    
  3. Eseguire il comando seguente per installare il pacchetto NuGet Azure.Messaging.EventHubs.Processor :

    Install-Package Azure.Messaging.EventHubs.Processor
    

Aggiornare il metodo Main

  1. Aggiungere le istruzioni seguenti using all'inizio del file Program.cs .

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Aggiungere costanti alla Program classe per l'stringa di connessione hub eventi e il nome dell'hub eventi. Sostituire i segnaposto tra parentesi quadre con i valori reali ottenuti quando è stato creato l'hub eventi e l'account di archiviazione (chiavi di accesso - stringa di connessione primaria). Assicurarsi che sia {Event Hubs namespace connection string} il stringa di connessione a livello di spazio dei nomi e non la stringa dell'hub eventi.

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    

    Usare ATLAS_ENTITIES come nome dell'hub eventi quando si inviano messaggi a Microsoft Purview.

  3. Sostituire il Main metodo con il metodo seguente async Main . Per informazioni dettagliate, vedere i commenti nel codice.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. Aggiungere ora alla classe i seguenti metodi di gestione degli eventi e degli errori.

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. Generare il progetto. Verificare che non vi siano errori.

    Nota

    Per il codice sorgente completo con altri commenti informativi, vedere questo file in GitHub.

  6. Eseguire l'applicazione ricevitore.

Esempio di messaggio ricevuto da Microsoft Purview

{
	"version":
		{"version":"1.0.0",
		 "versionParts":[1]
		},
		 "msgCompressionKind":"NONE",
		 "msgSplitIdx":1,
		 "msgSplitCount":1,
		 "msgSourceIP":"10.244.155.5",
		 "msgCreatedBy":
		 "",
		 "msgCreationTime":1618588940869,
		 "message":{
			"type":"ENTITY_NOTIFICATION_V2",
			"entity":{
				"typeName":"azure_sql_table",
					"attributes":{
						"owner":"admin",
						"createTime":0,
						"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
						"name":"SalesOrderTable",
						"description":"Sales Order Table"
						},
						"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
						"status":"ACTIVE",
						"displayText":"SalesOrderTable"
					},
					"operationType":"ENTITY_UPDATE",
					"eventTime":1618588940567
				}
}

Passaggi successivi

Vedere altri esempi in GitHub.