Spring Cloud Stream 的 Spring Cloud Azure 支援
本文適用於:✅ 4.19.0 ✅ 5.19.0 版
Spring Cloud Stream 是建置與共用傳訊系統連線的高度可調整事件驅動微服務的架構。
此架構提供一個彈性的程序設計模型,建置在已建立且熟悉的 Spring 慣用語和最佳做法上。 這些最佳做法包括持續發行/子語意、取用者群組和具狀態數據分割的支援。
目前的系結器實作包括:
-
spring-cloud-azure-stream-binder-eventhubs
- 如需詳細資訊,請參閱 適用於 Azure 事件中樞的 Spring Cloud Stream Binder -
spring-cloud-azure-stream-binder-servicebus
- 如需詳細資訊,請參閱 適用於 Azure 服務總線的 Spring Cloud Stream Binder
適用於 Azure 事件中樞的 Spring Cloud Stream Binder
重要概念
適用於 Azure 事件中樞的 Spring Cloud Stream Binder 提供 Spring Cloud Stream 架構的系結實作。 此實作會在其基礎上使用 Spring Integration 事件中樞通道配接器。 從設計的觀點來看,事件中樞與 Kafka 類似。 此外,事件中樞也可以透過 Kafka API 存取。 如果您的專案與 Kafka API 有緊密的相依性,您可以嘗試使用 Kafka API 範例 事件中樞
取用者群組
事件中樞提供與 Apache Kafka 類似的取用者群組支援,但邏輯稍有不同。 雖然 Kafka 會將所有認可的位移儲存在訊息代理程式中,但您必須手動儲存事件中樞訊息的位移。 事件中樞 SDK 提供函式,以將這類位移儲存在 Azure 記憶體內。
數據分割支援
事件中樞提供與 Kafka 類似的實體分割區概念。 但與 Kafka 在取用者和分割區之間的自動重新平衡不同,事件中樞提供一種先佔模式。 記憶體帳戶可作為租用,以判斷哪些取用者擁有哪個分割區。 當新的取用者啟動時,它會嘗試從負載最重的取用者竊取一些分割區,以達到工作負載平衡。
若要指定負載平衡策略,則會提供 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
的屬性。 如需詳細資訊,請參閱 取用者屬性 一節。
Batch 取用者支援
Spring Cloud Azure Stream 事件中樞系結器支援 Spring Cloud Stream Batch 取用者功能。
若要使用批次取用者模式,請將 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
屬性設定為 true
。 啟用時,會收到具有批次事件清單承載的訊息,並傳遞至 Consumer
函式。 每個訊息標頭也會轉換成清單,其中內容是從每個事件剖析的相關聯標頭值。 分割區標識碼、檢查點器和最後一個加入佇列屬性的公用標頭會顯示為單一值,因為整個事件批次共用相同的值。 如需詳細資訊,請參閱 Spring Integration
注意
只有在使用 MANUAL
檢查點模式時,才會有檢查點標頭。
批次取用者的檢查點支援兩種模式:BATCH
和 MANUAL
。
BATCH
模式是一種自動檢查點模式,一旦系結器收到事件,就會將整個事件批次檢查在一起。
MANUAL
模式是讓使用者檢查事件。 使用時,Checkpointer
會傳遞至訊息標頭,使用者可以使用它進行檢查點檢查。
您可以藉由設定前置詞為 max-size
的 max-wait-time
和 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
屬性來指定批次大小。
max-size
屬性是必要的,而且 max-wait-time
屬性是選擇性的。 如需詳細資訊,請參閱 取用者屬性 一節。
相依性設定
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Azure Stream 事件中樞入門版,如下列 Maven 範例所示:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
配置
系結器提供下列三個組態選項部分:
聯機組態屬性
本節包含用來連線到 Azure 事件中樞的組態選項。
注意
如果您選擇使用安全性主體向 Microsoft Entra ID 進行驗證和授權,以存取 Azure 資源,請參閱 使用 Microsoft Entra ID 授權存取,以確保安全性主體已獲得存取 Azure 資源的足夠許可權。
spring-cloud-azure-stream-binder-eventhubs 的聯機可設定屬性:
財產 | 類型 | 描述 |
---|---|---|
spring.cloud.azure.eventhubs.enabled | 布爾 | 是否啟用 Azure 事件中樞。 |
spring.cloud.azure.eventhubs.connection-string | 字串 | 事件中樞命名空間連接字串值。 |
spring.cloud.azure.eventhubs.namespace | 字串 | 事件中樞命名空間值,這是 FQDN 的前置詞。 FQDN 應該由 NamespaceName.DomainName 組成 |
spring.cloud.azure.eventhubs.domain-name | 字串 | Azure 事件中樞命名空間值的功能變數名稱。 |
spring.cloud.azure.eventhubs.custom-endpoint-address | 字串 | 自訂端點位址。 |
提示
一般 Azure 服務 SDK 組態選項也可以針對 Spring Cloud Azure Stream 事件中樞系結器進行設定。
Spring Cloud Azure 組態中引進支援的組態選項,而且可以使用統一前置詞 spring.cloud.azure.
或 spring.cloud.azure.eventhubs.
的前置詞來設定。
系結器預設也支援 spring Could Azure Resource Manager 。 若要瞭解如何擷取具有未授與
檢查點組態屬性
本節包含記憶體 Blob 服務的組態選項,用於保存分割區擁有權和檢查點資訊。
注意
從 4.0.0 版開始,當 spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 未手動啟用時,將不會自動建立記憶體容器,且名稱 來自 spring.cloud.stream.bindings.binding-name.destination。
spring-cloud-azure-stream-binder-eventhubs 的檢查點設定屬性:
財產 | 類型 | 描述 |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | 布爾 | 如果不存在,是否允許建立容器。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | 字串 | 記憶體帳戶的名稱。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | 字串 | 儲存體帳戶存取金鑰。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | 字串 | 記憶體容器名稱。 |
提示
常見的 Azure 服務 SDK 組態選項也可以針對記憶體 Blob 檢查點存放區進行設定。
Spring Cloud Azure 組態中引進支援的組態選項,而且可以使用統一前置詞 spring.cloud.azure.
或 spring.cloud.azure.eventhubs.processor.checkpoint-store
的前置詞來設定。
Azure 事件中樞系結組態屬性
下列選項分為四個區段:取用者屬性、進階取用者組態、產生者屬性和進階生產者設定。
取用者屬性
這些屬性會透過 EventHubsConsumerProperties
公開。
注意
為了避免重複,因為版本 4.19.0 和 5.19.0,Spring Cloud Azure Stream Binder 事件中樞支持設定所有通道的值,格式為 spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
。
spring-cloud-azure-stream-binder-eventhubs 的取用者可設定屬性:
財產 | 類型 | 描述 |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | 取用者決定如何檢查點訊息時使用的檢查點模式 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | 整數 | 決定每個分割區要執行一個檢查點的訊息數量。 只有在使用 PARTITION_COUNT 檢查點模式時才會生效。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | 期間 | 決定執行一個檢查點的時間間隔。 只有在使用 TIME 檢查點模式時才會生效。 |
spring.cloud.stream.eventhubs.bindings。<binding-name.consumer.batch.max-size | 整數 | 批次中的事件數目上限。 批次取用者模式的必要專案。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | 期間 | 批次耗用的最大持續時間。 只有在啟用批次取用者模式且為選擇性時,才會生效。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | 期間 | 更新的時間間隔持續時間。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | 負載平衡策略。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | 期間 | 分割區擁有權到期的時間持續時間。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | 布爾 | 事件處理器是否應該在其相關聯的分割區上要求最後一個加入佇列事件的資訊,並追蹤接收事件時的資訊。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | 整數 | 取用者用來控制事件中樞取用者會在本機主動接收和排入佇列的事件數目。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | 將索引鍵對應為分割區標識碼,以及 StartPositionProperties 的值 |
如果數據分割的檢查點不存在於檢查點存放區中,則包含要用於每個數據分割之事件位置的對應。 此對應是以分割區標識碼為索引鍵。 |
注意
initial-partition-event-position
組態接受 map
來指定每個事件中樞的初始位置。 因此,其索引鍵是分割區標識符,而值是 StartPositionProperties
,其中包含位移、序號、加入佇列日期時間和是否包含的屬性。 例如,您可以將它設定為
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
進階取用者設定
上述 連線、檢查點和 常見的 Azure SDK 用戶端 組態支援自定義每個系結器取用者,您可以使用前置詞 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
進行設定。
產生者屬性
這些屬性會透過 EventHubsProducerProperties
公開。
注意
為了避免重複,因為版本 4.19.0 和 5.19.0,Spring Cloud Azure Stream Binder 事件中樞支持設定所有通道的值,格式為 spring.cloud.stream.eventhubs.default.producer.<property>=<value>
。
spring-cloud-azure-stream-binder-eventhubs 的產生者可設定屬性:
財產 | 類型 | 描述 |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | 布爾 | 產生者同步處理的切換旗標。 如果為 true,產生者會在傳送作業之後等候回應。 |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | 長 | 在傳送作業之後等候回應的時間量。 只有在啟用同步產生者時才會生效。 |
進階生產者設定
上述 連線 和 常見的 Azure SDK 用戶端 組態支援自定義每個系結器產生者,您可以使用前置詞 spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
進行設定。
基本用法
從事件中樞傳送和接收訊息
使用認證資訊填入組態選項。
針對認證作為連接字串,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
針對認證即服務主體,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
注意
tenant-id
允許的值包括:common
、organizations
、consumers
或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者的 使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID。
針對認證作為受控識別,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
定義供應商和取用者。
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
數據分割支援
系統會建立具有使用者提供數據分割資訊的 PartitionSupplier
,以設定要傳送之訊息的分割區資訊。 下列流程圖顯示取得分割區識別碼和索引鍵之不同優先順序的程式:
Batch 取用者支援
提供批次組態選項,如下列範例所示:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
定義供應商和取用者。
針對以
BATCH
的檢查點模式,您可以使用下列程式代碼來傳送訊息,並以批次方式取用。@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
針對以
MANUAL
的檢查點模式,您可以使用下列程式代碼來傳送訊息,並以批次方式取用/檢查點。@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
注意
在批次取用模式中,Spring Cloud Stream 系結器的預設內容類型是 application/json
,因此請確定訊息承載與內容類型一致。 例如,當使用預設內容類型的 application/json
接收具有 String
承載的訊息時,承載應該 JSON String
,並括住原始 String
文字的雙引號。 針對 text/plain
內容類型,它可以是直接 String
物件。 如需詳細資訊,請參閱 Spring Cloud Stream 內容類型交涉。
處理錯誤訊息
處理輸出系結錯誤訊息
根據預設,Spring Integration 會建立名為
errorChannel
的全域錯誤通道。 設定下列訊息端點來處理輸出系結錯誤訊息。@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
處理輸入系結錯誤訊息
Spring Cloud Stream 事件中樞系結器支援一個解決方案來處理輸入訊息系結的錯誤:錯誤處理程式。
錯誤處理程式:
Spring Cloud Stream 會公開一種機制,讓您藉由新增可接受
Consumer
實例的ErrorMessage
來提供自定義錯誤處理程式。 如需詳細資訊,請參閱 Spring Cloud Stream 檔中 處理錯誤訊息。系結預設錯誤處理程式
設定單一
Consumer
豆,以取用所有輸入系結錯誤訊息。 下列預設函式會訂閱每個輸入系結錯誤通道:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
您也需要將
spring.cloud.stream.default.error-handler-definition
屬性設定為函式名稱。系結特定錯誤處理程式
設定
Consumer
豆取用特定的輸入系結錯誤訊息。 下列函式會訂閱特定的輸入系結錯誤通道,且優先順序高於系結預設錯誤處理程式:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
您也需要將
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
屬性設定為函式名稱。
事件中樞訊息標頭
如需支援的基本訊息標頭,請參閱 Spring Integration
多個系結器支援
使用多個系結器也支援連線到多個事件中樞命名空間。 此範例會採用連接字串作為範例。 也支援服務主體和受控識別的認證。 您可以在每個系結器的環境設定中設定相關的屬性。
若要搭配事件中樞使用多個系結器,請在 application.yml 檔案中設定下列屬性:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
注意
上一個應用程式檔會示範如何為應用程式設定所有系結的單一預設輪詢器。 如果您要設定特定系結的輪詢器,您可以使用設定,例如
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
。我們需要定義兩個供應商和兩個取用者:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
資源布建
事件中樞系結器支援布建事件中樞和取用者群組,使用者可以使用下列屬性來啟用布建。
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
注意
tenant-id
允許的值包括:common
、organizations
、consumers
或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者的 使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID。
樣品
如需詳細資訊,請參閱 GitHub 上的 azure-spring-boot-samples 存放庫
適用於 Azure 服務總線的 Spring Cloud Stream Binder
重要概念
適用於 Azure 服務總線的 Spring Cloud Stream Binder 提供 Spring Cloud Stream 架構的系結實作。 此實作會在其基礎上使用 Spring Integration Service 總線通道配接器。
排程的訊息
此系結器支援將訊息提交至主題以進行延遲處理。 用戶可以傳送具有標頭 x-delay
以毫秒表示訊息延遲時間的排程訊息。 訊息會在 x-delay
毫秒之後傳遞至個別主題。
取用者群組
服務總線主題提供與 Apache Kafka 類似的取用者群組支援,但邏輯稍有不同。
此系結器依賴主題 Subscription
作為取用者群組。
相依性設定
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Azure Stream 服務總線入門版,如下列 Maven 範例所示:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
配置
系結器提供下列兩個組態選項部分:
聯機組態屬性
本節包含用來連線到 Azure 服務總線的組態選項。
注意
如果您選擇使用安全性主體向 Microsoft Entra ID 進行驗證和授權,以存取 Azure 資源,請參閱 使用 Microsoft Entra ID 授權存取,以確保安全性主體已獲得存取 Azure 資源的足夠許可權。
spring-cloud-azure-stream-binder-servicebus 的聯機可設定屬性:
財產 | 類型 | 描述 |
---|---|---|
spring.cloud.azure.servicebus.enabled | 布爾 | 是否啟用 Azure 服務總線。 |
spring.cloud.azure.servicebus.connection-string | 字串 | 服務總線命名空間連接字串值。 |
spring.cloud.azure.servicebus.custom-endpoint-address | 字串 | 連接到服務總線時要使用的自訂端點位址。 |
spring.cloud.azure.servicebus.namespace | 字串 | 服務總線命名空間值,這是 FQDN 的前置詞。 FQDN 應該由 NamespaceName.DomainName 組成 |
spring.cloud.azure.servicebus.domain-name | 字串 | Azure 服務總線命名空間值的功能變數名稱。 |
注意
一般 Azure 服務 SDK 組態選項也可以針對 Spring Cloud Azure Stream 服務總線系結器進行設定。
Spring Cloud Azure 組態中引進支援的組態選項,而且可以使用統一前置詞 spring.cloud.azure.
或 spring.cloud.azure.servicebus.
的前置詞來設定。
系結器預設也支援 spring Could Azure Resource Manager 。 若要瞭解如何擷取具有未授與
Azure 服務總線系結組態屬性
下列選項分為四個區段:取用者屬性、進階取用者組態、產生者屬性和進階生產者設定。
取用者屬性
這些屬性會透過 ServiceBusConsumerProperties
公開。
注意
為了避免重複,因為版本 4.19.0 和 5.19.0,Spring Cloud Azure Stream Binder 服務總線支援設定所有通道的值,格式為 spring.cloud.stream.servicebus.default.consumer.<property>=<value>
。
spring-cloud-azure-stream-binder-servicebus 的取用者可設定屬性:
財產 | 類型 | 違約 | 描述 |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | 布爾 | 假 | 如果失敗的訊息會路由傳送至 DLQ。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | 整數 | 1 | 服務總線處理器客戶端應該處理的最大並行訊息。 啟用工作階段時,它會套用至每個工作階段。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | 整數 | 零 | 在任何指定時間要處理的並行會話數目上限。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | 布爾 | 零 | 是否啟用工作階段。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | 整數 | 0 | 服務總線處理器用戶端的預先擷取計數。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | SubQueue | 沒有 | 要連接的子佇列類型。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | 期間 | 5m | 繼續自動更新鎖定的時間量。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | 服務總線處理器用戶端的接收模式。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | 布爾 | 真 | 是否要自動解決訊息。 如果設定為 false,將會新增 Checkpointer 的訊息標頭,讓開發人員手動解決訊息。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-mb | 長 | 1024 | 佇列/主題的大小上限,以 MB 為單位,這是為佇列/主題配置的記憶體大小。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | 期間 | P10675199DT2H48M5.4775807S。 (10675199天、2小時、48分、5秒和477毫秒) | 訊息到期的持續時間,從訊息傳送至服務總線時開始。 |
重要
當您使用 azure Resource Manager
進階取用者設定
上述 連線 和 常見的 Azure SDK 用戶端 組態支援自定義每個系結器取用者,您可以使用前置詞 spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
進行設定。
產生者屬性
這些屬性會透過 ServiceBusProducerProperties
公開。
注意
為了避免重複,因為版本 4.19.0 和 5.19.0,Spring Cloud Azure Stream Binder 服務總線支援設定所有通道的值,格式為 spring.cloud.stream.servicebus.default.producer.<property>=<value>
。
spring-cloud-azure-stream-binder-servicebus 的生產者可設定屬性:
財產 | 類型 | 違約 | 描述 |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | 布爾 | 假 | 切換產生者同步處理的旗標。 |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | 長 | 10000 | 傳送產生者的逾時值。 |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | 零 | 系結產生者所需的服務總線實體類型。 |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-mb | 長 | 1024 | 佇列/主題的大小上限,以 MB 為單位,這是為佇列/主題配置的記憶體大小。 |
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | 期間 | P10675199DT2H48M5.4775807S。 (10675199天、2小時、48分、5秒和477毫秒) | 訊息到期的持續時間,從訊息傳送至服務總線時開始。 |
重要
使用系結產生者時,必須設定 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
的屬性。
進階生產者設定
上述 連線 和 常見的 Azure SDK 用戶端 組態支援自定義每個系結器產生者,您可以使用前置詞 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
進行設定。
基本用法
從/到服務總線傳送和接收訊息
使用認證資訊填入組態選項。
針對認證作為連接字串,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
針對認證即服務主體,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
注意
tenant-id
允許的值包括:common
、organizations
、consumers
或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者的 使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID。
針對認證作為受控識別,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
定義供應商和取用者。
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
數據分割索引鍵支援
系結器支援 服務總線分割,方法是在訊息標頭中設定分割區索引鍵和會話標識符。 本節介紹如何設定訊息的數據分割索引鍵。
Spring Cloud Stream 提供分割區索引鍵 SpEL 運算式屬性 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
。 例如,將此屬性設定為 "'partitionKey-' + headers[<message-header-key>]"
,並新增名為 message-header-key 的標頭。 在評估表達式以指派分割區索引鍵時,Spring Cloud Stream 會使用此標頭的值。 下列程式代碼提供範例產生者:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
會話支援
系結器支援服務總線
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
注意
根據 服務總線分割,會話標識符的優先順序高於分割區索引鍵。 因此,當同時設定 ServiceBusMessageHeaders#SESSION_ID
和 ServiceBusMessageHeaders#PARTITION_KEY
標頭時,會話標識碼的值最終會用來覆寫分割區索引鍵的值。
處理錯誤訊息
處理輸出系結錯誤訊息
根據預設,Spring Integration 會建立名為
errorChannel
的全域錯誤通道。 設定下列訊息端點來處理輸出系結錯誤訊息。@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
處理輸入系結錯誤訊息
Spring Cloud Stream 服務總線系結器支援兩個解決方案來處理輸入訊息系結的錯誤:系結器錯誤處理程式和處理程式。
Binder 錯誤處理程式:
默認系結器錯誤處理程式會處理輸入系結。 啟用
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
時,您可以使用此處理程式將失敗的訊息傳送至寄不出的信件佇列。 否則,會放棄失敗的訊息。 系結器錯誤處理程式與其他提供的錯誤處理程式互斥。錯誤處理程式:
Spring Cloud Stream 會公開一種機制,讓您藉由新增可接受
Consumer
實例的ErrorMessage
來提供自定義錯誤處理程式。 如需詳細資訊,請參閱 Spring Cloud Stream 檔中 處理錯誤訊息。系結預設錯誤處理程式
設定單一
Consumer
豆,以取用所有輸入系結錯誤訊息。 下列預設函式會訂閱每個輸入系結錯誤通道:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
您也需要將
spring.cloud.stream.default.error-handler-definition
屬性設定為函式名稱。系結特定錯誤處理程式
設定
Consumer
豆取用特定的輸入系結錯誤訊息。 下列函式會訂閱優先順序高於系結預設錯誤處理程式的特定輸入系結錯誤通道。@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
您也需要將
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
屬性設定為函式名稱。
服務總線訊息標頭
如需支援的基本訊息 標頭,請參閱 Spring Integration
注意
設定分割區索引鍵時,訊息標頭的優先順序高於 Spring Cloud Stream 屬性。 因此,只有當未設定任何 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
和 ServiceBusMessageHeaders#SESSION_ID
標頭時,ServiceBusMessageHeaders#PARTITION_KEY
才會生效。
多個系結器支援
使用多個系結器也支援連線到多個服務總線命名空間。 此範例會採用連接字串作為範例。 也支援服務主體和受控識別的認證,用戶可以在每個系結器的環境設定中設定相關的屬性。
若要使用 ServiceBus 的多個系結器,請在 application.yml 檔案中設定下列屬性:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
注意
上一個應用程式檔會示範如何為應用程式設定所有系結的單一預設輪詢器。 如果您要設定特定系結的輪詢器,您可以使用設定,例如
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
。我們需要定義兩個供應商和兩個消費者
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
資源布建
服務總線系結器支援布建佇列、主題和訂用帳戶,使用者可以使用下列屬性來啟用布建。
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
注意
tenant-id
允許的值包括:common
、organizations
、consumers
或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者的 使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID。
自訂服務總線客戶端屬性
開發人員可以使用 AzureServiceClientBuilderCustomizer
來自定義服務總線用戶端屬性。 下列範例會自訂 sessionIdleTimeout
中的 ServiceBusClientBuilder
屬性:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
樣品
如需詳細資訊,請參閱 GitHub 上的 azure-spring-boot-samples 存放庫