具有 Azure 服務匯流排 的 Spring Cloud Stream
本文示範如何使用 Spring Cloud Stream Binder 將訊息傳送至和接收來自 服務匯流排 queues
和 topics
的訊息。
Azure 提供稱為「Azure 服務匯流排」(「服務匯流排」)的異步傳訊平臺,以進階消息佇列通訊協定 1.0(“AMQP 1.0”) 標準為基礎。 服務匯流排 可用於支援的 Azure 平台範圍。
必要條件
Azure 訂用帳戶 - 建立免費帳戶。
Java Development Kit (JDK) 第 8 版或更高版本。
Apache Maven 3.2 版或更高版本。
cURL 或類似的 HTTP 公用程式來測試功能。
Azure 服務匯流排 的佇列或主題。 如果您沒有佇列,請建立 服務匯流排 佇列或建立 服務匯流排 主題。
Spring Boot 應用程式。 如果您沒有這個應用程式,請使用 Spring Initializr 來建立 Maven 專案。 請務必選取 Maven 專案,然後在 [相依性] 底下新增 Spring Web 和 Azure 支援相依性,然後選取 [Java 第 8 版] 或更新版本。
注意
若要授與帳戶對 Azure 服務匯流排 資源的存取權,請將 和 Azure Service Bus Data Receiver
角色指派Azure Service Bus Data Sender
給您目前使用的 Microsoft Entra 帳戶。 如需授與存取角色的詳細資訊,請參閱使用 Azure 入口網站 和驗證和授權具有 Microsoft Entra 標識符的應用程式指派 Azure 角色,以存取 Azure 服務匯流排 實體。
重要
需要 Spring Boot 2.5 版或更高版本,才能完成本文中的步驟。
從 Azure 服務匯流排 傳送和接收訊息
使用 Azure 服務匯流排 的佇列或主題,您可以使用 Spring Cloud Azure Stream Binder 服務匯流排 來傳送和接收訊息。
若要安裝 Spring Cloud Azure Stream Binder 服務匯流排 模組,請將下列相依性新增至您的pom.xml檔案:
Spring Cloud Azure 材料帳單(BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果您使用 Spring Boot 2.x,請務必將
spring-cloud-azure-dependencies
版本設定為4.19.0
。 此材料帳單 (BOM) 應該在<dependencyManagement>
pom.xml檔案的 區段中設定。 這可確保所有 Spring Cloud Azure 相依性都使用相同的版本。 如需此 BOM 所用版本的詳細資訊,請參閱 應該使用哪個版本的 Spring Cloud Azure。Spring Cloud Azure Stream Binder 服務匯流排 成品:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
編碼應用程式
使用下列步驟來設定應用程式,以使用 服務匯流排 佇列或主題來傳送和接收訊息。
在組態檔
application.properties
中設定 服務匯流排 認證。spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
下表描述群組態中的欄位:
欄位 描述 spring.cloud.azure.servicebus.namespace
指定您在 服務匯流排 中從 Azure 入口網站 取得的命名空間。 spring.cloud.stream.bindings.consume-in-0.destination
指定您在本教學課程中使用的 服務匯流排 佇列或 服務匯流排 主題。 spring.cloud.stream.bindings.supply-out-0.destination
指定用於輸入目的地的相同值。 spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
指定是否自動結算訊息。 如果設定為 false,將會新增 的 Checkpointer
訊息標頭,讓開發人員手動解決訊息。spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
指定輸出系結的實體類型可以是 queue
或topic
。spring.cloud.function.definition
指定要系結至系結所公開之外部目的地的功能豆。 spring.cloud.stream.poller.fixed-delay
指定預設輪詢器以毫秒為單位的固定延遲。 預設值為 1000 L。建議值為 60000。 spring.cloud.stream.poller.initial-delay
指定定期觸發程式的初始延遲。 預設值為 0。 編輯啟動類別檔案以顯示下列內容。
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
提示
在本教學課程中,組態或程式代碼中沒有任何驗證作業。 不過,連線到 Azure 服務需要驗證。 若要完成驗證,您需要使用 Azure Identity。 Spring Cloud Azure 使用
DefaultAzureCredential
,Azure 身分識別連結庫會提供它來協助您取得認證,而不需要變更任何程序代碼。DefaultAzureCredential
支援多種驗證方法,並在執行階段判斷應使用的方法。 這種方法可讓您的應用程式在不同的環境中使用不同的驗證方法(例如本機和生產環境),而不需要實作環境特定的程序代碼。 如需詳細資訊,請參閱 DefaultAzureCredential。若要在本機開發環境中完成驗證,您可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 如需詳細資訊,請參閱 Java 開發環境中的 Azure 驗證。 若要在 Azure 裝載環境中完成驗證,建議您使用使用者指派的受控識別。 如需詳細資訊,請參閱什麼是 Azure 資源受控識別?
啟動應用程式。 如下列範例的訊息將會張貼在應用程式記錄檔中:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed