共用方式為


Azure 事件中樞 .NET 的用戶端程式庫 - 5.8.1 版

Azure 事件中樞是可高度調整的發佈訂閱服務,每秒可以擷取數百萬個事件,並將其串流至多個取用者。 這可讓您處理和分析連線裝置和應用程式所產生的大量資料。 一旦事件中樞收集資料,您就可以使用任何即時分析提供者或批次處理/儲存體配接器來擷取、轉換及儲存資料。 如果您想要深入瞭解Azure 事件中樞,您可以檢閱:什麼是事件中樞

Azure 事件中樞用戶端程式庫可讓您發佈和取用 Azure 事件中樞事件,而且可用來:

  • 發出您應用程式的相關遙測,以供商業智慧和診斷之用。

  • 發佈有關您應用程式狀態的事實,而有興趣的對象可以查看並用來作為採取動作的觸發程序。

  • 查看您業務或其他生態系統內有興趣的作業和互動,讓鬆散結合的系統能夠進行互動,而不需要將其繫結在一起。

  • 接收來自一個或多個發行者的事件、將其轉換以更符合您生態系統的需求,然後將轉換後的事件發佈至新的串流,供取用者查看。

| 原始程式碼套件 (NuGet) | API 參考檔 | 產品檔 | 移轉指南 | 疑難排解指南

開始使用

Prerequisites

  • Azure 訂用帳戶:若要使用 Azure 服務,包括Azure 事件中樞,您需要訂用帳戶。 如果您沒有現有的 Azure 帳戶,您可以在建立帳戶時註冊免費試用或使用Visual Studio 訂用帳戶權益。

  • 具有事件中樞的事件中樞命名空間:若要與Azure 事件中樞互動,您也必須提供命名空間和事件中樞。 如果您不熟悉建立 Azure 資源,建議您遵循使用 Azure 入口網站 建立事件中樞的逐步指南。 您也可以在該處找到使用 Azure CLI、Azure PowerShell或 Azure Resource Manager (ARM) 範本來建立事件中樞的詳細指示。

  • C# 8.0:Azure 事件中樞用戶端程式庫會使用 C# 8.0 中引進的新功能。 若要利用 C# 8.0 語法,建議您使用 .NET Core SDK 3.0 或更新版本編譯, 語言版本latest

    想要充分利用 C# 8.0 語法的 Visual Studio 使用者必須使用 Visual Studio 2019 或更新版本。 Visual Studio 2019 (包括免費的 Community 版) 可以在這裡下載。 Visual Studio 2017 的使用者可以利用 C# 8 語法,方法是使用 Microsoft.Net.Compilers NuGet 套件 和設定語言版本,但編輯體驗可能不理想。

    您仍然可以搭配舊版 C# 語言使用程式庫,但必須手動管理非同步可列舉和非同步可處置成員,而不是受益于新的語法。 您仍然可以以 .NET Core SDK 支援的任何架構版本為目標,包括舊版 .NET Core 或 .NET Framework。 如需詳細資訊,請參閱: 如何指定目標架構
    重要注意事項: 若要在不修改的情況下建置或執行 範例範例 ,必須使用 C# 11.0。 如果您決定調整其他語言版本,您仍然可以執行範例。 範例中提供執行此動作的範例:舊版語言。

若要在 Azure 中快速建立一組基本的事件中樞資源,並接收它們的連接字串,您可以按一下下列命令來部署我們的範例範本:

部署至 Azure

安裝套件

使用NuGet安裝適用于 .NET 的 Azure 事件中樞用戶端程式庫:

dotnet add package Azure.Messaging.EventHubs

驗證用戶端

若要讓事件中樞用戶端程式庫與事件中樞互動,必須瞭解如何與其連線和授權。 最簡單的方式是使用連接字串,這會在建立事件中樞命名空間時自動建立。 如果您不熟悉搭配事件中樞使用連接字串,您可以依照逐步指南取得 事件中樞連接字串

重要概念

  • 事件中樞用戶端是開發人員與事件中樞用戶端程式庫互動的主要介面。 有數個不同的事件中樞用戶端,其各自專用於特定事件中樞的使用,例如發佈或取用事件。

  • 事件中樞產生者是一種用戶端,可作為遙測資料、診斷資訊、使用記錄或其他記錄資料的來源,做為內嵌裝置解決方案的一部分、行動裝置應用程式、在主控台或其他裝置上執行的遊戲標題、某些用戶端或伺服器型商務解決方案或網站。

  • 事件中樞取用者是一種用戶端,可從事件中樞讀取資訊,並允許進行處理。 處理時可能涉及彙總、複雜的計算和篩選。 處理也可能涉及以未經處理的方式或經過轉換的方式散發或儲存資訊。 事件中樞取用者通常是強固且大規模的平台基礎結構組件,且具有內建分析功能,例如 Azure 串流分析、Apache Spark 或 Apache Storm。

  • 資料分割是經過排序且保存在事件中樞內的事件序列。 資料分割是與事件取用者所需平行處理原則相關聯的資料組織方式。 Azure 事件中樞透過分割取用者模式來提供訊息串流,每位取用者只會讀取訊息串流的特定子集 (即資料分割)。 當較新的事件送達時,系統會將它們加入序列的結尾。 建立事件中樞時會指定資料分割數目,且無法加以變更。

  • 取用者群組可讓您檢視整個事件中樞。 取用者群組能讓多個取用應用程式擁有自己的事件串流檢視,以及按照自己的步調和從自己的位置自行讀取串流。 每一取用者群組的一個資料分割上最多可以有 5 個並行讀取器;不過,建議給定的資料分割和取用者群組配對只有一個作用中的取用者。 每個作用中的讀者都會從其資料分割收到所有事件;如果相同資料分割上有多個讀者,則其會收到重複的事件。

如需更多概念和更深入的討論,請參閱: 事件中樞功能

用戶端存留期

每個事件中樞用戶端類型都可以安全地快取並使用作為應用程式的存留期,這是定期發佈或讀取事件時的最佳做法。 用戶端負責有效率地管理網路、CPU 和記憶體使用量,以在閒置期間保持低使用量。 CloseAsync您必須在用戶端上呼叫 或 DisposeAsync ,以確保已正確清除網路資源和其他 Unmanaged 物件。

執行緒安全

我們保證所有用戶端實例方法都是安全線程,且彼此獨立 (指導方針) 。 這可確保重複使用用戶端實例的建議一律是安全的,即使是跨執行緒也一樣。

資料模型類型,例如 EventDataEventDataBatch 不是安全線程。 它們不應該跨執行緒共用,也不應該與用戶端方法同時使用。

其他概念

用戶端選項 | 處理失敗 | 診斷 | 嘲笑

範例

检查事件中心

许多事件中心操作都在特定分区范围内进行。 由於分割區是由事件中樞所擁有,因此會在建立時指派其名稱。 要了解哪些分区可用,可以使用一个事件中心客户端查询事件中心。 這些範例中示範了 EventHubProducerClient 作為說明,但概念和形式可在用戶端之間通用。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    string[] partitionIds = await producer.GetPartitionIdsAsync();
}

将事件发布到事件中心

為了發佈事件,您將必須建立 EventHubProducerClient。 生成者分批发布事件,并可能请求特定分区,或允许事件中心服务决定应将事件发布到哪些分区。 當發行事件需要高度可用或事件資料應該平均分散在分割區之間時,建議使用自動路由。 我們的範例會利用自動路由。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

从事件中心读取事件

為了從事件中樞讀取事件,您必須為指定的取用者群組建立 EventHubConsumerClient。 建立事件中樞時,其會提供可用來開始探索事件中樞的預設取用者群組。 在我們的範例中,我們將著重於使用迭代器讀取已發佈至事件中樞的所有事件。

注意: 請務必注意,這個取用方法旨在改善探索事件中樞用戶端程式庫和原型設計的體驗。 建議不要在生產案例中使用。 針對生產環境使用,我們建議使用事件處理器用戶端,因為其提供更可靠且高效能的體驗。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsAsync(cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the Event Hub.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

從事件中樞分割區讀取事件

若要讀取事件中樞資料分割的事件,您必須為指定的取用者群組建立 EventHubConsumerClient 。 建立事件中樞時,其會提供可用來開始探索事件中樞的預設取用者群組。 若要從特定分割區讀取,取用者也必須指定事件資料流程中開始接收事件的位置;在我們的範例中,我們將著重于讀取事件中樞第一個分割區的所有已發佈事件。

var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";

string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

await using (var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName))
{
    EventPosition startingPosition = EventPosition.Earliest;
    string partitionId = (await consumer.GetPartitionIdsAsync()).First();

    using var cancellationSource = new CancellationTokenSource();
    cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

    await foreach (PartitionEvent receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
    {
        // At this point, the loop will wait for events to be available in the partition.  When an event
        // is available, the loop will iterate with the event that was received.  Because we did not
        // specify a maximum wait time, the loop will wait forever unless cancellation is requested using
        // the cancellation token.
    }
}

使用事件處理器用戶端處理事件

在大部分的生產案例中,建議使用 事件處理器用戶端 來讀取和處理事件。 處理器旨在提供強固的體驗,以高效能且容錯的方式處理事件中樞的所有分割區,同時提供保存其狀態的方法。 事件處理器用戶端也能夠在指定事件中樞的取用者群組內容中合作運作,這些用戶端會自動管理工作散發和平衡,因為實例可供使用或無法使用群組。

由於 EventProcessorClient 相依於 Azure 儲存體 Blob 以維持其狀態,因此您必須為處理器提供 BlobContainerClient,而其已針對應使用的儲存體帳戶和容器進行設定。

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

您可以在事件處理器用戶端 讀我檔案 和隨附 的範例中找到更多詳細資料。

搭配事件中樞用戶端使用 Active Directory 主體

Azure 身分識別程式庫提供 Azure Active Directory 驗證支援,可用於 Azure 用戶端程式庫,包括事件中樞。

若要使用 Active Directory 主體,建立事件中樞用戶端時會指定連結 Azure.Identity 庫的其中一個可用認證。 此外,會提供完整的事件中樞命名空間和所需事件中樞的名稱,而不是事件中樞連接字串。 這些範例中示範了 EventHubProducerClient 作為說明,但概念和形式可在用戶端之間通用。

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var credential = new DefaultAzureCredential();

await using (var producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
    eventBatch.TryAdd(new EventData(new BinaryData("First")));
    eventBatch.TryAdd(new EventData(new BinaryData("Second")));

    await producer.SendAsync(eventBatch);
}

使用 Azure Active Directory 時,您的主體必須獲指派一個角色,以允許存取事件中樞,例如 Azure Event Hubs Data Owner 角色。 如需搭配事件中樞使用 Azure Active Directory 授權的詳細資訊,請參閱 相關聯的檔

疑難排解

如需詳細的疑難排解資訊,請參閱 事件中樞疑難排解指南

記錄和診斷

事件中樞用戶端程式庫會使用 .NET EventSource 發出資訊,完整檢測各種詳細層級的資訊。 記錄會針對每個作業執行,並遵循標記作業起點、完成和發生任何例外狀況的模式。 可能提供深入解析的其他資訊也會記錄在相關聯作業的內容中。

事件中樞用戶端記錄可供任何 EventListener 使用,方法是加入宣告名為 「Azure-Messaging-EventHubs」 的來源,或加入宣告具有 「AzureEventSource」 特性的所有來源。 為了更輕鬆地從 Azure 用戶端程式庫擷取記錄, Azure.Core 事件中樞所使用的程式庫會提供 AzureEventSourceListener 。 如需詳細資訊,請參閱 使用 AzureEventSourceListener 擷取事件中樞記錄

事件中樞用戶端程式庫也會使用 Application Insights 或 OpenTelemetry 來檢測分散式追蹤。 如需詳細資訊,請參閱 Azure.Core 診斷範例

下一步

除了討論的簡介案例之外,Azure 事件中樞用戶端程式庫提供其他案例的支援,以協助利用Azure 事件中樞服務的完整功能集。 為了協助探索其中一些案例,事件中樞用戶端程式庫提供範例專案,作為常見案例的圖例。 如需詳細資訊,請參閱自 述檔 範例。

參與

此專案歡迎參與和提供建議。 大部分的參與都要求您同意「參與者授權合約 (CLA)」,宣告您有權且確實授與我們使用投稿的權利。 如需詳細資料,請前往 https://cla.microsoft.com

當您提交提取要求時,CLA Bot 會自動判斷您是否需要提供 CLA,並適當地裝飾 PR (例如標籤、註解)。 請遵循 bot 提供的指示。 您只需要使用我們的 CLA 在所有存放庫上執行此動作一次。

此專案採用 Microsoft Open Source Code of Conduct (Microsoft 開放原始碼管理辦法)。 如需詳細資訊,請參閱管理辦法常見問題集,如有任何其他問題或意見請連絡 opencode@microsoft.com

如需詳細資訊,請參閱我們的 參與指南

曝光數