使用 Java 將事件傳送至 Azure 事件中樞或從中接收事件
本快速入門說明如何使用 azure-messaging-eventhubs Java 套件,來傳送事件至事件中樞和從事件中樞接收事件。
提示
如果您在 Spring 應用程式中使用 Azure 事件中樞資源,建議您考慮將 Spring Cloud Azure 作為替代方案。 Spring Cloud Azure 是開放原始碼專案,可提供與 Azure 服務的 Spring 無縫整合。 若要深入瞭解 Spring Cloud Azure,以及查看使用事件中樞的範例,請參閱 使用 Azure 事件中樞的Spring Cloud Stream。
必要條件
如果您對 Azure 事件中樞並不熟悉,在進行此快速入門之前,請先參閱事件中樞概述。
若要完成本快速入門,您必須符合下列必要條件:
- Microsoft Azure 訂用帳戶。 若要使用 Azure 服務 (包括 Azure 事件中樞),您需要訂用帳戶。 如果您沒有現有的 Azure 帳戶,您可以申請免費試用,或是在建立帳戶時使用 MSDN 訂閱者權益。
- Java 開發環境。 本快速入門使用 Eclipse。 需要含第 8 版或更新版本的 Java 開發套件 (JDK)。
- 建立事件中樞命名空間和事件中樞。 第一個步驟是使用 Azure 入口網站來建立「事件中樞」類型的命名空間,然後取得您應用程式與「事件中樞」進行通訊所需的管理認證。 若要建立命名空間和事件中樞,請依照這篇文章中的程序操作。 然後,依照下列文章中的指示,取得事件中樞命名空間的連接字串:取得連接字串。 您稍後會在本快速入門中使用連接字串。
傳送事件
本節說明如何建立可將事件傳送至事件中樞的 Java 應用程式。
加入 Azure 事件中樞程式庫的參考
首先,在您最喜愛的 Java 開發環境中,為主控台/殼層應用程式建立新的 Maven 專案。 請更新 pom.xml
檔案為下列內容。 Maven 中央存放庫中具有適用於事件中樞的 Java 用戶端程式庫。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.18.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.11.2</version>
<scope>compile</scope>
</dependency>
注意
將版本更新為發佈至 Maven 存放庫的最新版本。
向 Azure 驗證應用程式
本快速入門顯示兩種連線到 Azure 事件中樞的方式:無密碼和連接字串。 第一個選項如何使用 Microsoft Entra ID 和顯示角色型存取控制 (RBAC) 中的安全性主體來連線到事件中樞命名空間。 您不需要擔心在程式碼或設定檔或 Azure Key Vault 等安全儲存體中,有硬式編碼連接字串。 第二個選項顯示如何使用連接字串來連線到事件中樞命名空間。 如果您不熟悉 Azure,則連接字串選項可能會更容易遵循。 建議在真實世界應用程式和實際執行環境中使用無密碼選項。 如需詳細資訊,請參閱驗證與授權。 您也可以在概觀頁面上,深入了解無密碼驗證。
將角色指派給 Microsoft Entra 使用者
在本機開發時,請確定連線到 Azure 事件中樞的使用者帳戶具有正確的權限。 您需要 Azure 事件中樞資料擁有者角色,才能傳送和接收訊息。 若要將此角色指派給您自己,您需要使用者存取管理員角色,或另一個包含 Microsoft.Authorization/roleAssignments/write
動作的角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 您可以在範圍概觀頁面上,深入了解角色指派的可用範圍。
下列範例會將 Azure Event Hubs Data Owner
角色指派給您的使用者帳戶,該角色提供對 Azure 事件中樞資源的完整存取權。 在實際案例中,遵循最低權限原則,只為使用者提供更安全實際執行環境所需的最低權限。
Azure 事件中樞的 Azure 內建角色
對於 Azure 事件中樞來說,透過 Azure 入口網站和 Azure 資源管理 API 來管理的命名空間和所有相關資源,皆已使用 Azure RBAC 模型來加以保護。 Azure 提供下列 Azure 內建角色,以授權存取事件中樞命名空間:
- Azure 事件中樞資料擁有者:允許資料存取事件中樞命名空間及其實體 (佇列、主題、訂用帳戶和篩選)
- Azure 事件中樞資料傳送者:使用此角色可讓傳送者存取事件中樞命名空間及其實體。
- Azure 事件中樞資料接收者:使用此角色可讓接收者存取事件中樞命名空間及其實體。
如果您想要建立自訂角色,請參閱事件中樞作業所需的權限。
重要
在大部分情況下,角色指派在 Azure 中傳播只需要一兩分鐘。 在罕見的情況下,可能需要高達八分鐘的時間。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。
在 Azure 入口網站中,使用主要搜尋列或左側導覽找出您的事件中樞命名空間。
在概觀頁面上,從左側功能表中選取 [存取控制 (IAM)]。
在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。
從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]。
使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋
Azure Event Hubs Data Owner
並選取相符的結果。 接著,選擇 [下一步]。在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]。
在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]。
選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。
撰寫程式碼以將訊息傳送到事件中樞
新增名為 Sender
的類別,並在此類別中新增下列程式碼:
重要
- 將
<NAMESPACE NAME>
更新為您的事件中樞命名空間名稱。 - 將
<EVENT HUB NAME>
更新為您的事件中樞名稱。
package ehubquickstart;
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
import com.azure.identity.*;
public class SenderAAD {
// replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
// Example: private static final String namespaceName = "contosons.servicebus.windows.net";
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
// Replace <EVENT HUB NAME> with the name of your event hub.
// Example: private static final String eventHubName = "ordersehub";
private static final String eventHubName = "<EVENT HUB NAME>";
public static void main(String[] args) {
publishEvents();
}
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a token using the default Azure credential
DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
.authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
.build();
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace(namespaceName)
.eventHubName(eventHubName)
.credential(credential)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
}
建置程式,並確定沒有任何錯誤。 執行接收者程式之後,您將會執行此程式。
接收事件
本教學課程中的程式碼是根據 GitHub 上的 EventProcessorClient 程式碼,您可以檢查該程式碼以查看完整的運作中應用程式。
使用 Azure Blob 儲存體作為檢查點存放區時,請遵循下列建議:
- 針對每個取用者群組使用不同的容器。 您可以使用相同的儲存體帳戶,但每個群組各使用一個容器。
- 請勿將容器用於其他任何項目,也不會將儲存體帳戶用於其他任何項目。
- 儲存體帳戶應位於與已部署應用程式所在的相同區域中。 如果應用程式是內部部署,請嘗試選擇最接近的區域。
在 Azure 入口網站的 [儲存體帳戶] 頁面上,於 [Blob 服務] 區段中,確定已停用下列設定。
- 階層式命名空間
- Blob 虛刪除
- 版本控制
建立 Azure 儲存體 Blob 容器
在本快速入門中,您會使用 Azure 儲存體 (明確來說是 Blob 儲存體) 作為檢查點存放區。 檢查點是事件處理器用來標記或認可資料分割內於最後成功處理事件的程序。 標記檢查點通常是在處理事件的函式中完成。 若要深入了解檢查點,請參閱事件處理器。
請遵循這些步驟來建立 Azure 儲存體帳戶。
- 建立 Azure 儲存體帳戶
- 建立 Blob 容器
- 向 Blob 容器進行驗證
在本機開發時,請確定存取 Blob 資料的使用者帳戶具有正確的權限。 您需要儲存體 Blob 資料參與者才能讀取和寫入 Blob 資料。 若要指派此角色給您自己,您需要被指派使用者存取管理員角色,或另一個包含 Microsoft.Authorization/roleAssignments/write 動作的角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 您可以在範圍概觀頁面上深入了解角色指派的可用範圍。
在此案例中,您會將權限指派給使用者帳戶 (以儲存體帳戶為範圍),以遵循最低權限原則。 此做法只為使用者提供所需的最低權限,並建立更安全的實際執行環境。
下列範例將儲存體 Blob 資料參與者角色指派給使用者帳戶,以針對儲存體帳戶中的 Blob 資料提供讀取和寫入存取權。
重要
在大部分情況下,角色指派在 Azure 中傳播只需要一兩分鐘,但在罕見情況下,可能需要長達八分鐘。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。
在 Azure 入口網站中,使用主要搜尋列或左側導覽找出您的儲存體帳戶。
在儲存體帳戶概觀頁面上,從左側功能表中選取 [存取控制 (IAM)]。
在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。
從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]。
使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋「儲存體 Blob 資料參與者」,選取相符的結果,然後選擇 [下一步]。
在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]。
在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]。
選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。
將事件中樞程式庫新增至您的 JAVA 專案
在 pom.xml 中新增下列相依性。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.15.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
在 JAVA 檔案頂端新增下列
import
陳述式。import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.util.function.Consumer; import com.azure.identity.*;
建立名為
Receiver
的類別,並且將下列字串變數新增至此類別。 將預留位置取代為正確的值。重要
將預留位置取代為正確的值。
- 以事件中樞命名空間名稱取代
<NAMESPACE NAME>
。 <EVENT HUB NAME>
換成您在命名空間中的事件中樞名稱。
private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net"; private static final String eventHubName = "<EVENT HUB NAME>";
- 以事件中樞命名空間名稱取代
將下列
main
方法新增至類別。重要
將預留位置取代為正確的值。
<STORAGE ACCOUNT NAME>
使用 Azure 儲存體帳戶名稱。<CONTAINER NAME>
儲存體帳戶中 Blob 容器的名稱
// create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD) .build(); // Create a blob container client that you use later to build an event processor client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .credential(credential) .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net") .containerName("<CONTAINER NAME>") .buildAsyncClient(); // Create an event processor client to receive and process events and errors. EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder() .fullyQualifiedNamespace(namespaceName) .eventHubName(eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) .credential(credential) .buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process");
將處理事件和錯誤的兩個協助程式方法 (
PARTITION_PROCESSOR
和ERROR_HANDLER
) 新增至Receiver
類別。public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };
建置程式,並確定沒有任何錯誤。
執行應用程式
先執行接收者應用程式。
然後,執行傳送者應用程式。
在接收者應用程式視窗中,確認您看到傳送者應用程式所發佈的事件。
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar
按下接收者應用程式視窗中 ENTER 以停止應用程式。
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar Stopping event processor Event processor stopped. Exiting process
下一步
請參閱 GitHub 上的下列範例: