Compartilhar via


Utilizar hubs de eventos e .NET para enviar e receber mensagens de tópicos do Atlas Kafka

Este início rápido ensina-o a enviar e receber eventos de tópicos do Atlas Kafka . Vamos utilizar o Hubs de Eventos do Azure e a biblioteca .NET Azure.Messaging.EventHubs.

Pré-requisitos

Se não estiver familiarizado com os Hubs de Eventos, veja Descrição geral dos Hubs de Eventos antes de concluir este início rápido.

Para seguir este início rápido, precisa de determinados pré-requisitos:

  • Uma subscrição do Microsoft Azure. Para utilizar os serviços do Azure, incluindo os Hubs de Eventos, precisa de uma subscrição do Azure. Se não tiver uma conta do Azure, pode inscrever-se numa avaliação gratuita ou utilizar os benefícios de subscritor do MSDN quando criar uma conta.
  • Microsoft Visual Studio 2022. A biblioteca de cliente dos Hubs de Eventos utiliza as novas funcionalidades que foram introduzidas no C# 8.0. Ainda pode utilizar a biblioteca com versões C# anteriores, mas a nova sintaxe não estará disponível. Para utilizar a sintaxe completa, recomenda-se que compile com o SDK .NET Core 3.0 ou superior e a versão de idioma definida como latest. Se estiver a utilizar uma versão do Visual Studio anterior ao Visual Studio 2019, não tem as ferramentas necessárias para criar projetos C# 8.0. O Visual Studio 2022, incluindo a edição Community gratuita, pode ser transferido aqui.
  • Uma conta ativa do Microsoft Purview.
  • Um Hub de Eventos configurado com a sua conta do Microsoft Purview para enviar e receber mensagens:
    • A sua conta pode já estar configurada. Pode marcar sua conta do Microsoft Purview no portal do Azure em Definições, configuração do Kafka. Se ainda não estiver configurado, siga este guia.

Publicar mensagens no Microsoft Purview

Vamos criar uma aplicação de consola .NET Core que envia eventos para o Microsoft Purview através do tópico Kafka dos Hubs de Eventos , ATLAS_HOOK.

Para publicar mensagens no Microsoft Purview, precisará de um Hub de Eventos gerido ou de, pelo menos, um Hub de Eventos com uma configuração de hook..

Criar um projeto do Visual Studio

Em seguida, crie uma aplicação de consola .NET C# no Visual Studio:

  1. Inicie o Visual Studio.
  2. Na janela Iniciar, selecione Criar um novo projeto>Aplicação de Consola (.NET Framework). É necessária a versão 4.5.2 ou superior do .NET.
  3. Em Nome do projeto, introduza PurviewKafkaProducer.
  4. Selecione Criar para criar o projeto.

Criar uma aplicação de consola

  1. Inicie o Visual Studio 2022.
  2. Selecione Criar um novo projeto.
  3. Na caixa de diálogo Criar um novo projeto , siga os seguintes passos: se não vir esta caixa de diálogo, selecione Ficheiro no menu, selecione Novo e, em seguida, selecione Projeto.
    1. Selecione C# para a linguagem de programação.
    2. Selecione Consola para o tipo de aplicação.
    3. Selecione Aplicação de Consola (.NET Core) na lista de resultados.
    4. Em seguida, selecione Avançar.

Adicionar o pacote NuGet dos Hubs de Eventos

  1. Selecione Ferramentas>Consola do Gestor de Pacotes nuGet Gestor> dePacotesno menu.

  2. Execute o seguinte comando para instalar o pacote NuGet Azure.Messaging.EventHubs e o pacote NuGet Azure.Messaging.EventHubs.Producer :

    Install-Package Azure.Messaging.EventHubs
    
    Install-Package Azure.Messaging.EventHubs.Producer
    

Escrever código que envia mensagens para o hub de eventos

  1. Adicione as seguintes using instruções à parte superior do ficheiro Program.cs :

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. Adicione constantes à Program classe para os Hubs de Eventos cadeia de conexão e o nome dos Hubs de Eventos.

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. Substitua o Main método pelo seguinte async Main método e adicione um async ProduceMessage para enviar mensagens para o Microsoft Purview. Veja os comentários no código para obter detalhes.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
     		/ Create an event producer client to add events in the event hub
            EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
    
     		await ProduceMessage(producer);
        }
    
     	static async Task ProduceMessage(EventHubProducerClient producer)
    
        {
     		// Create a batch of events 
     		using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
     		// Add events to the batch. An event is a represented by a collection of bytes and metadata. 
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
    
     		// Use the producer client to send the batch of events to the event hub
     		await producerClient.SendAsync(eventBatch);
     		Console.WriteLine("A batch of 3 events has been published.");
    
     	}
    
  4. Compile o projeto. Assegure-se de que não existem erros.

  5. Execute o programa e aguarde pela mensagem de confirmação.

    Observação

    Para obter o código fonte completo com comentários mais informativos, veja este ficheiro no GitHub

Código de exemplo que cria uma tabela sql com duas colunas através de uma mensagem Criar Entidade JSON

	
	{
    "msgCreatedBy":"nayenama",
    "message":{
        "type":"ENTITY_CREATE_V2",
        "user":"admin",
        "entities":{
            "entities":[
                {
                    "typeName":"azure_sql_table",
                    "attributes":{
                        "owner":"admin",
                        "temporary":false,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
                        "name":"SalesOrderTable",
                        "description":"Sales Order Table added via Kafka"
                    },
                    "relationshipAttributes":{
                        "columns":[
                            {
                                "guid":"-1102395743156037",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
                                }
                            },
                            {
                                "guid":"-1102395743156038",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
                                }
                            }
                        ]
                    },
                    "guid":"-1102395743156036",
                    "version":0
                }
            ],
            "referredEntities":{
                "-1102395743156037":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
                        "precision":23,
                        "length":8,
                        "description":"Sales Order ID",
                        "scale":3,
                        "name":"OrderID",
                        "data_type":"int"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156037",
                    "version":2
                },
                "-1102395743156038":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
                        "description":"Sales Order Date",
                        "scale":3,
                        "name":"OrderDate",
                        "data_type":"datetime"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156038",
                    "status":"ACTIVE",
                    "createdBy":"ServiceAdmin",
                    "version":0
                }
            }
        }
    },
    "version":{
        "version":"1.0.0"
    },
    "msgCompressionKind":"NONE",
    "msgSplitIdx":1,
    "msgSplitCount":1
}


Receber mensagens do Microsoft Purview

Em seguida, saiba como escrever uma aplicação de consola .NET Core que recebe mensagens de hubs de eventos com um processador de eventos. O processador de eventos gere pontos de verificação persistentes e recepções paralelas a partir de hubs de eventos. Isto simplifica o processo de receção de eventos. Tem de utilizar o hub de eventos ATLAS_ENTITIES para receber mensagens do Microsoft Purview.

Para receber mensagens do Microsoft Purview, precisará de um Hub de Eventos gerido ou de uma configuração de notificação dos Hubs de Eventos.

Aviso

O SDK dos Hubs de Eventos utiliza a versão mais recente da API de Armazenamento disponível. Essa versão pode não estar necessariamente disponível na sua plataforma do Stack Hub. Se executar este código no Azure Stack Hub, ocorrerá erros de runtime, a menos que tenha como destino a versão específica que está a utilizar. Se estiver a utilizar Armazenamento de Blobs do Azure como um arquivo de pontos de verificação, reveja a versão suportada da API de Armazenamento do Azure para a compilação do Azure Stack Hub e, no código, direcione essa versão.

A versão mais elevada disponível do serviço de Armazenamento é a versão 2019-02-02. Por predefinição, a biblioteca de cliente do SDK dos Hubs de Eventos utiliza a versão mais elevada disponível no Azure (2019-07-07 no momento do lançamento do SDK). Se estiver a utilizar a versão 2005 do Azure Stack Hub, além de seguir os passos nesta secção, também terá de adicionar código que se destina à versão da API do Serviço de armazenamento 2019-02-02. Para saber como direcionar uma versão específica da API de Armazenamento, veja este exemplo no GitHub.

Criar um Armazenamento do Azure e um contentor de blobs

Vamos utilizar o Armazenamento do Azure como o arquivo de pontos de verificação. Utilize os seguintes passos para criar uma conta de Armazenamento do Azure.

  1. Configurar uma conta de armazenamento do Azure

  2. Criar um contentor de blobs

  3. Obter o cadeia de conexão da conta de armazenamento

    Anote o cadeia de conexão e o nome do contentor. Irá utilizá-los no código de receção.

Criar um projeto do Visual Studio para o recetor

  1. Na janela Gerenciador de Soluções, selecione sem soltar (ou clique com o botão direito do rato) a solução EventHubQuickStart, aponte para Adicionar e selecione Novo Projeto.
  2. Selecione Aplicação de Consola (.NET Core) e selecione Seguinte.
  3. Introduza PurviewKafkaConsumer para o Nome do projeto e selecione Criar.

Adicionar o pacote NuGet dos Hubs de Eventos

  1. Selecione Ferramentas>Consola do Gestor de Pacotes nuGet Gestor> dePacotesno menu.

  2. Execute o seguinte comando para instalar o pacote NuGet Azure.Messaging.EventHubs :

    Install-Package Azure.Messaging.EventHubs
    
  3. Execute o seguinte comando para instalar o pacote NuGet Azure.Messaging.EventHubs.Processor :

    Install-Package Azure.Messaging.EventHubs.Processor
    

Atualizar o método Main

  1. Adicione as seguintes using instruções na parte superior do ficheiro Program.cs .

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Adicione constantes à Program classe para os Hubs de Eventos cadeia de conexão e o nome do hub de eventos. Substitua os marcadores de posição entre parênteses retos pelos valores reais que obteve quando criou o hub de eventos e a conta de armazenamento (chaves de acesso – cadeia de conexão primário). Certifique-se de que o {Event Hubs namespace connection string} é o cadeia de conexão ao nível do espaço de nomes e não a cadeia do hub de eventos.

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    

    Utilize ATLAS_ENTITIES como o nome do hub de eventos ao enviar mensagens para o Microsoft Purview.

  3. Substitua o Main método pelo seguinte async Main método. Veja os comentários no código para obter detalhes.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. Agora, adicione os seguintes métodos de processador de eventos e erros à classe .

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. Compile o projeto. Assegure-se de que não existem erros.

    Observação

    Para obter o código fonte completo com comentários mais informativos, veja este ficheiro no GitHub.

  6. Execute a aplicação recetora.

Um exemplo de uma Mensagem recebida do Microsoft Purview

{
	"version":
		{"version":"1.0.0",
		 "versionParts":[1]
		},
		 "msgCompressionKind":"NONE",
		 "msgSplitIdx":1,
		 "msgSplitCount":1,
		 "msgSourceIP":"10.244.155.5",
		 "msgCreatedBy":
		 "",
		 "msgCreationTime":1618588940869,
		 "message":{
			"type":"ENTITY_NOTIFICATION_V2",
			"entity":{
				"typeName":"azure_sql_table",
					"attributes":{
						"owner":"admin",
						"createTime":0,
						"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
						"name":"SalesOrderTable",
						"description":"Sales Order Table"
						},
						"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
						"status":"ACTIVE",
						"displayText":"SalesOrderTable"
					},
					"operationType":"ENTITY_UPDATE",
					"eventTime":1618588940567
				}
}

Próximas etapas

Veja mais exemplos no GitHub.