使用事件中樞和 .NET 來傳送和接收 Atlas Kafka 主題訊息
本快速入門會教導您如何傳送和接收 Atlas Kafka 主題事件。 我們將使用 Azure 事件中樞 和 Azure.Messaging.EventHubs .NET 連結庫。
必要條件
如果您不熟悉事件中樞,請先參閱 事件中樞概觀 ,再完成本快速入門。
若要遵循本快速入門,您需要具備特定必要條件:
- Microsoft Azure 訂用帳戶。 若要使用包括事件中樞在內的 Azure 服務,您需要 Azure 訂用帳戶。 如果您沒有 Azure 帳戶,您可以註冊 免費試 用,或在 建立帳戶時使用 MSDN 訂閱者權益。
-
Microsoft Visual Studio 2022。 事件中樞客戶端連結庫會使用 C# 8.0 中引進的新功能。 您仍然可以搭配舊版 C# 使用連結庫,但無法使用新的語法。 若要使用完整的語法,建議您使用 .NET Core SDK 3.0 或更高版本進行編譯,並將 語言版本 設定為
latest
。 如果您使用 Visual Studio 2019 之前的 Visual Studio 版本,則沒有建置 C# 8.0 專案所需的工具。 您可以 在這裡下載 Visual Studio 2022,包括免費的 Community 版本。 - 作用 中Microsoft Purview 帳戶。
- 使用您的 Microsoft Purview 帳戶設定的事件中樞,可傳送和接收訊息:
將訊息發佈至 Microsoft Purview
讓我們建立 .NET Core 控制台應用程式,透過事件中樞 Kafka 主題將事件傳送至 Microsoft Purview,ATLAS_HOOK。
若要將訊息發佈至 Microsoft Purview,您需要一個受控 事件中樞,或 至少一個具有攔截組態的事件中樞。
建立 Visual Studio 專案
接下來,在 Visual Studio 中建立 C# .NET 控制台應用程式:
- 啟動 Visual Studio。
- 在 [開始] 視窗中,選取 [建立新的專案>主控台應用程式 (.NET Framework) 。 需要 .NET 4.5.2 版或更新版本。
- 在 [項目名稱] 中,輸入 PurviewKafkaProducer。
- 選 取 [建立 ] 以建立專案。
建立主控台應用程式
- 啟動 Visual Studio 2022。
- 選 取 [建立新專案]。
- 在 [ 建立新專案 ] 對話框上,執行下列步驟:如果您沒有看到此對話框,請選取功能表上的 [ 檔案 ],選取 [ 新增],然後選取 [ 專案]。
- 選取 C# 作為程式設計語言。
- 選 取 [主控台 ] 作為應用程式的類型。
- 從結果清單中選取 [控制台應用程式] (.NET [核心) ]。
- 然後,選取 [下一步]。
新增事件中樞 NuGet 套件
從功能表中選取 [工具>NuGet 套件管理員>套件管理員主控台 ]。
執行下列命令來安裝 Azure.Messaging.EventHubs NuGet 套件和 Azure.Messaging.EventHubs.Producer NuGet 套件:
Install-Package Azure.Messaging.EventHubs
Install-Package Azure.Messaging.EventHubs.Producer
撰寫將訊息傳送至事件中樞的程序代碼
將下列
using
語句新增至 Program.cs 檔案的頂端:using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer;
將常數新增至
Program
事件中樞 連接字串 和事件中樞名稱的 類別。private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>";
將方法
Main
取代為下列async Main
方法,async ProduceMessage
並新增 以將訊息推送至 Microsoft Purview。 如需詳細資訊,請參閱程序代碼中的批註。static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; / Create an event producer client to add events in the event hub EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName); await ProduceMessage(producer); } static async Task ProduceMessage(EventHubProducerClient producer) { // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add events to the batch. An event is a represented by a collection of bytes and metadata. eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>"))); // Use the producer client to send the batch of events to the event hub await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 3 events has been published."); }
建置專案。 請確定沒有任何錯誤。
執行程式並等候確認訊息。
注意事項
如需具有詳細資訊批註的完整原始程式碼,請參閱 GitHub 中的此檔案
使用建立實體 JSON 訊息建立具有兩個數據行之 sql 數據表的範例程式代碼
{
"msgCreatedBy":"nayenama",
"message":{
"type":"ENTITY_CREATE_V2",
"user":"admin",
"entities":{
"entities":[
{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"temporary":false,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table added via Kafka"
},
"relationshipAttributes":{
"columns":[
{
"guid":"-1102395743156037",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
}
},
{
"guid":"-1102395743156038",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
}
}
]
},
"guid":"-1102395743156036",
"version":0
}
],
"referredEntities":{
"-1102395743156037":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
"precision":23,
"length":8,
"description":"Sales Order ID",
"scale":3,
"name":"OrderID",
"data_type":"int"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156037",
"version":2
},
"-1102395743156038":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
"description":"Sales Order Date",
"scale":3,
"name":"OrderDate",
"data_type":"datetime"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156038",
"status":"ACTIVE",
"createdBy":"ServiceAdmin",
"version":0
}
}
}
},
"version":{
"version":"1.0.0"
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1
}
接收 Microsoft Purview 訊息
接下來,瞭解如何撰寫使用事件處理器從事件中樞接收訊息的 .NET Core 控制台應用程式。 事件處理器會管理來自事件中樞的持續性檢查點和平行接收。 這可簡化接收事件的程式。 您必須使用ATLAS_ENTITIES事件中樞來接收來自 Purview Microsoft訊息。
若要從 Microsoft Purview 接收訊息,您需要受控 事件中樞或 事件中樞通知組態。
警告
事件中樞 SDK 使用最新版的記憶體 API。 該版本不一定可在您的 Stack Hub 平臺上使用。 如果您在 Azure Stack Hub 上執行此程式代碼,除非您以所使用的特定版本為目標,否則會發生運行時錯誤。 如果您使用 Azure Blob 儲存體 作為檢查點存放區,請檢閱 Azure Stack Hub 組建支援的 Azure 記憶體 API 版本,並在您的程式代碼中以該版本為目標。
記憶體服務的最高可用版本是 2019-02-02 版。 根據預設,在 SDK) 發行時,事件中樞 SDK 用戶端連結庫會在 Azure (2019-07-07 上使用最高可用版本。 如果您使用 Azure Stack Hub 2005 版,除了遵循本節中的步驟之外,您還需要新增以記憶體服務 API 2019-02-02 版為目標的程式代碼。 若要瞭解如何以特定記憶體 API 版本為目標,請參閱 GitHub 中的此範例。
建立 Azure 記憶體和 Blob 容器
我們將使用 Azure 記憶體作為檢查點存放區。 使用下列步驟來建立 Azure 記憶體帳戶。
-
記下 連接字串 和容器名稱。 您會在接收程式代碼中使用它們。
為接收者建立Visual Studio專案
- 在 [方案總管] 視窗中,選取並按住 (,或以滑鼠右鍵按兩下 EventHubQuickStart 方案) ,指向 [新增],然後選取 [新增專案]。
- 選 取 [控制台應用程式] (.NET [核心) ],然後選取 [ 下一步]。
- 輸入 PurviewKafkaConsumer 作為 [項目名稱],然後選取 [ 建立]。
新增事件中樞 NuGet 套件
從功能表中選取 [工具>NuGet 套件管理員>套件管理員主控台 ]。
執行下列命令以安裝 Azure.Messaging.EventHubs NuGet 套件:
Install-Package Azure.Messaging.EventHubs
執行下列命令以安裝 Azure.Messaging.EventHubs.Processor NuGet 套件:
Install-Package Azure.Messaging.EventHubs.Processor
更新Main方法
在Program.cs檔案頂端新增下列
using
語句。using System; using System.Text; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor;
將常數新增至
Program
事件中樞 連接字串 和事件中樞名稱的 類別。 將括弧中的佔位元取代為您在建立事件中樞時取得的實際值,並將記憶體帳戶 (存取密鑰 - 主要 連接字串) 。 請確定{Event Hubs namespace connection string}
是命名空間層級 連接字串,而不是事件中樞字串。private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>"; private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>"; private const string blobContainerName = "<BLOB CONTAINER NAME>";
將 訊息傳送至 Microsoft Purview 時,請使用 ATLAS_ENTITIES 作為事件中樞名稱。
Main
將方法取代為下列async Main
方法。 如需詳細資訊,請參閱程序代碼中的批註。static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 10 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(10)); // Stop the processing await processor.StopProcessingAsync(); }
現在,將下列事件和錯誤處理程式方法新增至 類別。
static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Write the body of the event to the console window Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray())); // Update checkpoint in the blob storage so that the app receives only new events the next time it's run await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
建置專案。 請確定沒有任何錯誤。
注意事項
如需具有詳細資訊批注的完整原始程式碼,請參閱 GitHub 上的此檔案。
執行接收者應用程式。
從 Purview Microsoft接收的訊息範例
{
"version":
{"version":"1.0.0",
"versionParts":[1]
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1,
"msgSourceIP":"10.244.155.5",
"msgCreatedBy":
"",
"msgCreationTime":1618588940869,
"message":{
"type":"ENTITY_NOTIFICATION_V2",
"entity":{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"createTime":0,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table"
},
"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
"status":"ACTIVE",
"displayText":"SalesOrderTable"
},
"operationType":"ENTITY_UPDATE",
"eventTime":1618588940567
}
}
後續步驟
查看 GitHub 中的更多範例。