將訊息傳送到 Azure 服務匯流排主題,並從訂用帳戶接收來自主題的訊息 (Java)
在本快速入門中,您會使用 azure-messaging-servicebus 套件撰寫 Java 程式碼,以將訊息傳送至 Azure 服務匯流排主題,然後從該主題的訂用帳戶接收訊息。
注意
本快速入門提供逐步指示,說明將一批訊息傳送至服務匯流排主題,以及從主題訂用帳戶接收那些訊息的簡單案例。 在 GitHub 上的 Azure SDK for Java 存放庫中,您可找到預先建置的 Azure 服務匯流排 Java 樣本。
提示
如果您在 Spring 應用程式中使用 Azure 服務匯流排資源,建議您考慮將 Spring Cloud Azure (機器翻譯) 作為替代方案。 Spring Cloud Azure 是開放原始碼專案,可提供與 Azure 服務的 Spring 無縫整合。 若要深入了解 Spring Cloud Azure,以及查看使用服務匯流排的範例,請參閱使用 Azure 服務匯流排搭配 Spring Cloud Stream (機器翻譯)。
必要條件
- Azure 訂用帳戶。 若要完成此教學課程,您需要 Azure 帳戶。 您可以啟用自己的 Visual Studio 或 MSDN 訂閱者權益或註冊免費帳戶。
- 安裝 Azure SDK for Java。 如果您使用的是 Eclipse,則可以安裝包含 Azure SDK for Java 的適用於 Eclipse 的 Azure 工具組。 然後您可以將 Microsoft Azure Libraries for Java 新增至您的專案。 如果您正在使用 IntelliJ,請參閱安裝 Azure Toolkit for IntelliJ。
在 Azure 入口網站中建立命名空間
若要開始在 Azure 中使用服務匯流排傳訊實體,您必須先使用 Azure 中的唯一名稱建立命名空間。 命名空間為應用程式內的服務匯流排資源 (佇列、主題等) 提供範圍容器。
若要建立命名空間:
登入 Azure 入口網站。
瀏覽至 [所有服務] 頁面。
在左側導覽列上,從類別清單中選取 [整合],將滑鼠停留在 [服務匯流排] 上方,然後選取 [服務匯流排] 圖格上的 + 按鈕。
在 [建立命名空間] 頁面的 [基本] 標籤中,遵循下列步驟:
針對 [訂用帳戶],選擇要在其中建立命名空間的 Azure 訂用帳戶。
針對 [ 資源群組],選擇現有的資源群組,或建立新的資源群組。
輸入命名空間的名稱。 命名空間名稱應遵循下列命名慣例:
- 名稱在整個 Azure 中必須是唯一的。 系統會立即檢查此名稱是否可用。
- 名稱長度至少 6 個字元,最多 50 個字元。
- 名稱只能包含字母、數位、連字元
-
。 - 名稱開頭必須為字母,且結尾必須為字母或數字。
- 名稱結尾不是
-sb
或-mgmt
。
針對 [位置],選擇應裝載命名空間的區域。
針對定價層,選取命名空間的定價層 (基本、標準或進階)。 針對本快速入門,選取 [標準]。
如果您選取 [進階層 ],請選取您是否可以啟用 命名空間的異地複 寫。 異地複寫功能可確保命名空間的中繼資料和資料會持續從主要區域複寫至一或多個次要區域。
重要
如果您想要使用主題和訂用帳戶,請選擇 [標準] 或 [進階]。 基本定價層不支援主題/訂用帳戶。
若已選取 [進階] 定價層,請指定傳訊單位數目。 進階層可讓您的資源在 CPU 和記憶體層級上獲得隔離,讓每個工作負載能夠獨立執行。 此資源容器稱為傳訊單位。 進階命名空間都至少有一個傳訊單位。 您可以為每個服務匯流排的進階命名空間選取 1、2、4、8 或 16 個傳訊單位。 如需詳細資訊,請參閱服務匯流排進階傳訊。
選取頁面底部的 [檢閱 + 建立] 。
在 [檢閱 + 建立] 頁面上檢閱設定,然後選取 [建立]。
一旦部署資源成功,請在部署頁面上選取 [移至資源]。
您會看到服務匯流排命名空間的首頁。
使用 Azure 入口網站建立主題
在 [服務匯流排 命名空間] 頁面上,展開左側導覽功能表上的 [實體],然後選取左側功能表上的 [主題]。
選取工具列上的 [+ 主題]。
輸入主題的名稱。 保留其他選項的預設值。
選取 建立。
針對主題建立訂用帳戶
選取您在上一節中建立的主題。
在 [服務匯流排主題] 頁面上,選取工具列上的 [+ 訂用帳戶]。
在 [建立訂用帳戶] 頁面上,遵循下列步驟:
輸入 S1 作為訂用帳戶的 [名稱]。
然後,選取 [建立] 以建立訂用帳戶。
向 Azure 驗證應用程式
本快速入門顯示兩種連線到 Azure 服務匯流排的方式:無密碼和連接字串。
第一個選項顯示如何使用 Microsoft Entra ID 中的安全性主體和角色型存取控制 (RBAC),來連線到服務匯流排命名空間。 您不需要擔心在程式碼或設定檔或 Azure Key Vault 等安全儲存體中,有硬式編碼連接字串。
第二個選項顯示如何使用連接字串來連線到服務匯流排命名空間。 如果您不熟悉 Azure,則可能會發現連接字串選項更容易遵循。 建議在真實世界應用程式和實際執行環境中使用無密碼選項。 如需詳細資訊,請參閱驗證與授權。 您也可以在概觀頁面上,深入了解無密碼驗證。
將角色指派給 Microsoft Entra 使用者
在本機開發時,請確定連線到 Azure 服務匯流排的使用者帳戶具有正確的權限。 您需要 Azure 服務匯流排資料擁有者角色,才能傳送和接收訊息。 若要將此角色指派給您自己,您需要使用者存取管理員角色,或另一個包含 Microsoft.Authorization/roleAssignments/write
動作的角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 您可以在範圍概觀頁面上,深入了解角色指派的可用範圍。
下列範例會將 Azure Service Bus Data Owner
角色指派給您的使用者帳戶,該角色提供對 Azure 服務匯流排資源的完整存取權。 在實際案例中,遵循最低權限原則,只為使用者提供更安全實際執行環境所需的最低權限。
Azure 服務匯流排的 Azure 內建角色
對於 Azure 服務匯流排來說,透過 Azure 入口網站和 Azure 資源管理 API 來的管理命名空間和所有相關資源的作業,皆已使用 Azure RBAC 模型來加以保護。 Azure 提供下列 Azure 內建角色,以授權存取服務匯流排命名空間:
- Azure 服務匯流排資料擁有者:啟用對服務匯流排命名空間及其實體 (佇列、主題、訂用帳戶和篩選條件) 的資料存取。 此角色的成員可以從佇列或主題/訂用帳戶傳送和接收訊息。
- Azure 服務匯流排資料傳送者:使用此角色來授與服務匯流排命名空間及其實體的資料傳送權限。
- Azure 服務匯流排資料接收者:使用此角色來授與服務匯流排命名空間及其實體的資料接收權限。
如果您想要建立自訂角色,請參閱服務匯流排作業所需的權限。
將 Microsoft Entra 使用者新增至 Azure 服務匯流排擁有者角色
將您的 Microsoft Entra 使用者名稱新增至服務匯流排命名空間層級的 Azure 服務匯流排資料擁有者角色。 其可讓在使用者帳戶內容中執行的應用程式將訊息傳送至佇列或主題,以及從佇列或主題的訂用帳戶接收訊息。
重要
在大部分情況下,角色指派在 Azure 中傳播只需要一兩分鐘。 在少數情況下,可能需要長達八分鐘的時間。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。
如果沒有在 Azure 入口網站中開啟 [服務匯流排命名空間] 頁面,請使用主要搜尋列或左側導覽找出您的服務匯流排命名空間。
在概觀頁面上,從左側功能表中選取 [存取控制 (IAM)]。
在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。
從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]。
使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋
Azure Service Bus Data Owner
並選取相符的結果。 接著,選擇 [下一步]。在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]。
在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]。
選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。
傳送訊息至主題
在本節中,您將建立 JAVA 主控台專案,並新增程式碼以將訊息傳送至您建立的主題。
建立 Java 主控台專案
使用 Eclipse 或您選擇的工具建立 Java 專案。
設定應用程式以使用服務匯流排
新增 Azure 核心及 Azure 服務匯流排程式庫的參考。
若您使用 Eclipse 且已建立 JAVA 主控台應用程式,請將 JAVA 專案轉換成 Maven:以滑鼠右鍵按一下 [封裝總管] 視窗中的專案,選取 [設定] -> [轉換成 Maven 專案]。 接著如下列範例所示,將相依性新增至這兩個程式庫。
更新 pom.xml
檔案,將相依性新增至 Azure 服務匯流排和 Azure 身分識別套件。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.13.3</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
新增程式碼以將訊息傳送到主題
在 Java 檔案頂端新增下列
import
陳述式。在類別中,定義要保存連接字串 (無密碼案例不需要)、主題名稱和訂用帳戶名稱的變數。
在類別中新增名為
sendMessage
的方法,以將一則訊息傳送至主題。重要
以您的服務匯流排命名空間名稱取代
NAMESPACENAME
。static void sendMessage() { // create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net") .credential(credential) .sender() .topicName(topicName) .buildClient(); // send one message to the topic senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.println("Sent a single message to the topic: " + topicName); }
在類別中新增名為
createMessages
的方法,以建立訊息清單。 一般來說,您會從應用程式的不同部分取得這些訊息。 在這裡,我們會建立範例訊息清單。static List<ServiceBusMessage> createMessages() { // create a list of messages and return it to the caller ServiceBusMessage[] messages = { new ServiceBusMessage("First message"), new ServiceBusMessage("Second message"), new ServiceBusMessage("Third message") }; return Arrays.asList(messages); }
新增名為
sendMessageBatch
方法的方法,將訊息傳送至您所建立的主題。 這個方法會建立主題的ServiceBusSenderClient
,叫用createMessages
方法來取得訊息清單、準備一或多個批次,並將批次傳送至主題。重要
以您的服務匯流排命名空間名稱取代
NAMESPACENAME
。static void sendMessageBatch() { // create a token using the default Azure credential DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net") .credential(credential) .sender() .topicName(topicName) .buildClient(); // Creates an ServiceBusMessageBatch where the ServiceBus. ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch(); // create a list of messages List<ServiceBusMessage> listOfMessages = createMessages(); // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all // messages are sent. for (ServiceBusMessage message : listOfMessages) { if (messageBatch.tryAddMessage(message)) { continue; } // The batch is full, so we create a new batch and send the batch. senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the topic: " + topicName); // create a new batch messageBatch = senderClient.createMessageBatch(); // Add that message that we couldn't before. if (!messageBatch.tryAddMessage(message)) { System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes()); } } if (messageBatch.getCount() > 0) { senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the topic: " + topicName); } //close the client senderClient.close(); }
自訂用帳戶接收訊息
在本節中,您將藉由新增程式碼,從主題的訂用帳戶擷取訊息。
新增名為
receiveMessages
的方法以從訂用帳戶接收訊息。 這個方法會藉由指定處理訊息的處理常式,以及另一項處理錯誤的方式,來建立訂用帳戶的ServiceBusProcessorClient
。 然後方法會啟動處理器、等待數秒並列印收到的訊息,然後停止和關閉處理器。重要
- 以您的服務匯流排命名空間名稱取代
NAMESPACENAME
。 - 在程式碼中,以類別名稱取代
ServiceBusTopicTest::processMessage
中的ServiceBusTopicTest
。
// handles received messages static void receiveMessages() throws InterruptedException { DefaultAzureCredential credential = new DefaultAzureCredentialBuilder() .build(); // Create an instance of the processor through the ServiceBusClientBuilder ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .fullyQualifiedNamespace("NAMESPACENAME.servicebus.windows.net") .credential(credential) .processor() .topicName(topicName) .subscriptionName(subName) .processMessage(context -> processMessage(context)) .processError(context -> processError(context)) .buildProcessorClient(); System.out.println("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.println("Stopping and closing the processor"); processorClient.close(); }
- 以您的服務匯流排命名空間名稱取代
新增
processMessage
方法,以處理從服務匯流排訂用帳戶收到的訊息。private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }
新增
processError
方法以處理錯誤訊息。private static void processError(ServiceBusErrorContext context) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (!(context.getException() instanceof ServiceBusException)) { System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); return; } ServiceBusException exception = (ServiceBusException) context.getException(); ServiceBusFailureReason reason = exception.getReason(); if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) { System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", reason, exception.getMessage()); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { System.out.printf("Message lock lost for message: %s%n", context.getException()); } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { try { // Choosing an arbitrary amount of time to wait until trying again. TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { System.err.println("Unable to sleep for period of time"); } } else { System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(), reason, context.getException()); } }
更新
main
方法以叫用sendMessage
、sendMessageBatch
,並更新receiveMessages
方法以擲回InterruptedException
。public static void main(String[] args) throws InterruptedException { sendMessage(); sendMessageBatch(); receiveMessages(); }
執行應用程式
執行程式後應該會看到類似下列的輸出:
如果您使用 Eclipse,請以滑鼠右鍵按一下專案,選取 [匯出],展開 [JAVA],選取 [可執行的 JAR 檔案],然後依照步驟建立可執行的 JAR 檔案。
如果您使用與新增至 Azure 服務匯流排資料擁有者角色不同的使用者帳戶登入機器,請遵循下列步驟。 否則請略過此步驟,然後在下一個步驟中繼續執行 Jar 檔案。
在機器上安裝 Azure CLI (機器翻譯)。
執行下列 CLI 命令以登入 Azure。 使用您新增至 Azure 服務匯流排資料擁有者角色的相同使用者帳戶。
az login
使用下列命令執行 Jar 檔案。
java -jar <JAR FILE NAME>
您會在主控台視窗中看到以下輸出。
Sent a single message to the topic: mytopic Sent a batch of messages to the topic: mytopic Starting the processor Processing message. Session: e0102f5fbaf646988a2f4b65f7d32385, Sequence #: 1. Contents: Hello, World! Processing message. Session: 3e991e232ca248f2bc332caa8034bed9, Sequence #: 2. Contents: First message Processing message. Session: 56d3a9ea7df446f8a2944ee72cca4ea0, Sequence #: 3. Contents: Second message Processing message. Session: 7bd3bd3e966a40ebbc9b29b082da14bb, Sequence #: 4. Contents: Third message
在 Azure 入口網站中服務匯流排命名空間的概觀頁面上,您可以看到傳入和傳出訊息計數。 請等候一分鐘左右,然後重新整理頁面以查看最新的值。
切換至中間窗格中的主題索引標籤,然後選取主題以查看主題的服務匯流排主題頁面。 在此頁面上,您應該會在訊息圖表中看到四個傳入和四個傳出訊息。
如果您註解 main
方法中的 receiveMessages
呼叫,然後再次執行應用程式,則會在服務匯流排主題頁面上看到 8 個傳入訊息 (4 則新訊息),但有四個傳出訊息。
如果您在此頁面上選取訂用帳戶,您會進入服務匯流排訂用帳戶頁面。 您可以在此頁面上看到作用中的訊息計數、寄不出的信件訊息計數等等。 在此範例中,收件者尚未收到四個作用中的訊息。
下一步
請參閱下列文件和範例: