Поделиться через


Использование Центров событий и .NET для отправки и получения сообщений в разделах Atlas Kafka

В этом кратком руководстве описано, как отправлять и получать события тем Atlas Kafka . Мы будем использовать Центры событий Azure и библиотеку Azure.Messaging.EventHubs .NET.

Предварительные условия

Если вы не знакомы с Центрами событий, ознакомьтесь с разделом Общие сведения о Центрах событий , прежде чем выполнять работу с этим кратким руководством.

Чтобы выполнить инструкции из этого краткого руководства, необходимо выполнить определенные предварительные требования.

Публикация сообщений в Microsoft Purview

Давайте создадим консольное приложение .NET Core, которое отправляет события в Microsoft Purview через раздел Kafka Центров событий, ATLAS_HOOK.

Чтобы опубликовать сообщения в Microsoft Purview, вам потребуется либо управляемые Центры событий, либо по крайней мере один Концентратор событий с конфигурацией перехватчика.

Создание проекта Visual Studio

Далее создайте консольное приложение .NET C# в Visual Studio:

  1. Запустите Visual Studio.
  2. В окне Пуск выберите Создать консольное приложение проекта>(платформа .NET Framework). Требуется .NET версии 4.5.2 или более поздней.
  3. В поле Имя проекта введите PurviewKafkaProducer.
  4. Выберите Создать , чтобы создать проект.

Создание консольного приложения

  1. Запустите Visual Studio 2022.
  2. Выберите Создать новый проект.
  3. В диалоговом окне Создание проекта выполните следующие действия: Если это диалоговое окно не отображается, выберите Файл в меню, выберите Создать, а затем — Проект.
    1. Выберите C# для языка программирования.
    2. Выберите Консоль для типа приложения.
    3. Выберите Консольное приложение (.NET Core) в списке результатов.
    4. Затем выберите Далее.

Добавление пакета NuGet Центров событий

  1. В меню выберите Инструменты>Диспетчер пакетов>NuGet.

  2. Выполните следующую команду, чтобы установить пакет NuGet Azure.Messaging.EventHubs и пакет NuGet Azure.Messaging.EventHubs.Producer :

    Install-Package Azure.Messaging.EventHubs
    
    Install-Package Azure.Messaging.EventHubs.Producer
    

Написание кода, который отправляет сообщения в концентратор событий

  1. Добавьте следующие using инструкции в начало файла Program.cs :

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. Добавьте константы в Program класс для строка подключения Центров событий и имени Центров событий.

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. Замените Main метод следующим async Main методом async ProduceMessage и добавьте для отправки сообщений в Microsoft Purview. Дополнительные сведения см. в комментариях к коду.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
     		/ Create an event producer client to add events in the event hub
            EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
    
     		await ProduceMessage(producer);
        }
    
     	static async Task ProduceMessage(EventHubProducerClient producer)
    
        {
     		// Create a batch of events 
     		using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
     		// Add events to the batch. An event is a represented by a collection of bytes and metadata. 
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
    
     		// Use the producer client to send the batch of events to the event hub
     		await producerClient.SendAsync(eventBatch);
     		Console.WriteLine("A batch of 3 events has been published.");
    
     	}
    
  4. Выполните построение проекта. Убедитесь, что ошибок нет.

  5. Запустите программу и дождитесь сообщения о подтверждении.

    Примечание.

    Полный исходный код с дополнительными информационными комментариями см. в этом файле в GitHub.

Пример кода, который создает таблицу SQL с двумя столбцами с помощью сообщения JSON Create Entity

	
	{
    "msgCreatedBy":"nayenama",
    "message":{
        "type":"ENTITY_CREATE_V2",
        "user":"admin",
        "entities":{
            "entities":[
                {
                    "typeName":"azure_sql_table",
                    "attributes":{
                        "owner":"admin",
                        "temporary":false,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
                        "name":"SalesOrderTable",
                        "description":"Sales Order Table added via Kafka"
                    },
                    "relationshipAttributes":{
                        "columns":[
                            {
                                "guid":"-1102395743156037",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
                                }
                            },
                            {
                                "guid":"-1102395743156038",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
                                }
                            }
                        ]
                    },
                    "guid":"-1102395743156036",
                    "version":0
                }
            ],
            "referredEntities":{
                "-1102395743156037":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
                        "precision":23,
                        "length":8,
                        "description":"Sales Order ID",
                        "scale":3,
                        "name":"OrderID",
                        "data_type":"int"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156037",
                    "version":2
                },
                "-1102395743156038":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
                        "description":"Sales Order Date",
                        "scale":3,
                        "name":"OrderDate",
                        "data_type":"datetime"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156038",
                    "status":"ACTIVE",
                    "createdBy":"ServiceAdmin",
                    "version":0
                }
            }
        }
    },
    "version":{
        "version":"1.0.0"
    },
    "msgCompressionKind":"NONE",
    "msgSplitIdx":1,
    "msgSplitCount":1
}


Получение сообщений Microsoft Purview

Далее узнайте, как создать консольное приложение .NET Core, которое получает сообщения из центров событий с помощью обработчика событий. Обработчик событий управляет постоянными контрольными точками и параллельными приемами из концентраторов событий. Это упрощает процесс получения событий. Для получения сообщений из Microsoft Purview необходимо использовать концентратор событий ATLAS_ENTITIES.

Чтобы получать сообщения из Microsoft Purview, вам потребуется либо управляемые Центры событий, либо конфигурация уведомлений Центров событий.

Предупреждение

Пакет SDK центров событий использует последнюю доступную версию API хранилища. Эта версия может быть недоступна на платформе Stack Hub. При выполнении этого кода в Azure Stack Hub возникают ошибки среды выполнения, если только вы не используете конкретную версию. Если вы используете Хранилище BLOB-объектов Azure в качестве хранилища контрольных точек, просмотрите поддерживаемую версию API службы хранилища Azure для сборки Azure Stack Hub и в коде нацелитесь на эту версию.

Самая высокая доступная версия службы хранилища — 2019-02-02. По умолчанию клиентская библиотека SDK центров событий использует самую высокую доступную версию в Azure (2019-07-07 на момент выпуска пакета SDK). Если вы используете Azure Stack Hub версии 2005, помимо действий, описанных в этом разделе, вам также потребуется добавить код, предназначенный для API службы хранилища версии 2019-02-02.02. Сведения о том, как использовать определенную версию API хранилища, см. в этом примере в GitHub.

Создание хранилища Azure и контейнера BLOB-объектов

Мы будем использовать службу хранилища Azure в качестве хранилища контрольных точек. Чтобы создать учетную запись хранения Azure, выполните следующие действия.

  1. Создание учетной записи хранилища Azure

  2. Создание контейнера BLOB-объектов

  3. Получение строка подключения для учетной записи хранения

    Запишите строка подключения и имя контейнера. Вы будете использовать их в коде получения.

Создание проекта Visual Studio для получателя

  1. В окне Обозреватель решений выберите и удерживайте (или щелкните правой кнопкой мыши) решение EventHubQuickStart, наведите указатель мыши на пункт Добавить и выберите Создать проект.
  2. Выберите Консольное приложение (.NET Core) и нажмите кнопку Далее.
  3. Введите PurviewKafkaConsumer в поле Имя проекта и нажмите кнопку Создать.

Добавление пакета NuGet Центров событий

  1. В меню выберите Инструменты>Диспетчер пакетов>NuGet.

  2. Выполните следующую команду, чтобы установить пакет NuGet Azure.Messaging.EventHubs :

    Install-Package Azure.Messaging.EventHubs
    
  3. Выполните следующую команду, чтобы установить пакет NuGet Azure.Messaging.EventHubs.Processor :

    Install-Package Azure.Messaging.EventHubs.Processor
    

Обновление метода Main

  1. Добавьте следующие using инструкции в начало файла Program.cs .

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Добавьте константы в Program класс для строка подключения Центров событий и имя концентратора событий. Замените заполнители в квадратных скобках реальными значениями, которые вы получили при создании концентратора событий и учетной записи хранения (ключи доступа — первичные строка подключения). Убедитесь, что {Event Hubs namespace connection string} является строка подключения уровня пространства имен, а не строкой концентратора событий.

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    

    Используйте ATLAS_ENTITIES в качестве имени концентратора событий при отправке сообщений в Microsoft Purview.

  3. Замените Main метод следующим async Main методом. Дополнительные сведения см. в комментариях к коду.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. Теперь добавьте в класс следующие методы обработчика событий и ошибок.

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. Выполните построение проекта. Убедитесь, что ошибок нет.

    Примечание.

    Полный исходный код с дополнительными информационными комментариями см. в этом файле на сайте GitHub.

  6. Запустите приложение-получатель.

Пример сообщения, полученного от Microsoft Purview

{
	"version":
		{"version":"1.0.0",
		 "versionParts":[1]
		},
		 "msgCompressionKind":"NONE",
		 "msgSplitIdx":1,
		 "msgSplitCount":1,
		 "msgSourceIP":"10.244.155.5",
		 "msgCreatedBy":
		 "",
		 "msgCreationTime":1618588940869,
		 "message":{
			"type":"ENTITY_NOTIFICATION_V2",
			"entity":{
				"typeName":"azure_sql_table",
					"attributes":{
						"owner":"admin",
						"createTime":0,
						"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
						"name":"SalesOrderTable",
						"description":"Sales Order Table"
						},
						"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
						"status":"ACTIVE",
						"displayText":"SalesOrderTable"
					},
					"operationType":"ENTITY_UPDATE",
					"eventTime":1618588940567
				}
}

Дальнейшие действия

Дополнительные примеры см. в GitHub.