共用方式為


Spring Cloud Stream 的 Spring Cloud Azure 支援

本文適用於:✅ 4.19.0 ✅ 5.19.0 版

Spring Cloud Stream 是建置與共用傳訊系統連線的高度可調整事件驅動微服務的架構。

此架構提供一個彈性的程序設計模型,建置在已建立且熟悉的 Spring 慣用語和最佳做法上。 這些最佳做法包括持續發行/子語意、取用者群組和具狀態數據分割的支援。

目前的系結器實作包括:

適用於 Azure 事件中樞的 Spring Cloud Stream Binder

重要概念

適用於 Azure 事件中樞的 Spring Cloud Stream Binder 提供 Spring Cloud Stream 架構的系結實作。 此實作會在其基礎上使用 Spring Integration 事件中樞通道配接器。 從設計的觀點來看,事件中樞與 Kafka 類似。 此外,事件中樞也可以透過 Kafka API 存取。 如果您的專案與 Kafka API 有緊密的相依性,您可以嘗試使用 Kafka API 範例 事件中樞

取用者群組

事件中樞提供與 Apache Kafka 類似的取用者群組支援,但邏輯稍有不同。 雖然 Kafka 會將所有認可的位移儲存在訊息代理程式中,但您必須手動儲存事件中樞訊息的位移。 事件中樞 SDK 提供函式,以將這類位移儲存在 Azure 記憶體內。

數據分割支援

事件中樞提供與 Kafka 類似的實體分割區概念。 但與 Kafka 在取用者和分割區之間的自動重新平衡不同,事件中樞提供一種先佔模式。 記憶體帳戶可作為租用,以判斷哪些取用者擁有哪個分割區。 當新的取用者啟動時,它會嘗試從負載最重的取用者竊取一些分割區,以達到工作負載平衡。

若要指定負載平衡策略,則會提供 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* 的屬性。 如需詳細資訊,請參閱 取用者屬性 一節。

Batch 取用者支援

Spring Cloud Azure Stream 事件中樞系結器支援 Spring Cloud Stream Batch 取用者功能

若要使用批次取用者模式,請將 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 屬性設定為 true。 啟用時,會收到具有批次事件清單承載的訊息,並傳遞至 Consumer 函式。 每個訊息標頭也會轉換成清單,其中內容是從每個事件剖析的相關聯標頭值。 分割區標識碼、檢查點器和最後一個加入佇列屬性的公用標頭會顯示為單一值,因為整個事件批次共用相同的值。 如需詳細資訊,請參閱 Spring IntegrationSpring Cloud Azure 支援的 事件中樞訊息標頭 一節。

注意

只有在使用 MANUAL 檢查點模式時,才會有檢查點標頭。

批次取用者的檢查點支援兩種模式:BATCHMANUALBATCH 模式是一種自動檢查點模式,一旦系結器收到事件,就會將整個事件批次檢查在一起。 MANUAL 模式是讓使用者檢查事件。 使用時,Checkpointer 會傳遞至訊息標頭,使用者可以使用它進行檢查點檢查。

您可以藉由設定前置詞為 max-sizemax-wait-timespring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch. 屬性來指定批次大小。 max-size 屬性是必要的,而且 max-wait-time 屬性是選擇性的。 如需詳細資訊,請參閱 取用者屬性 一節。

相依性設定

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Azure Stream 事件中樞入門版,如下列 Maven 範例所示:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

配置

系結器提供下列三個組態選項部分:

聯機組態屬性

本節包含用來連線到 Azure 事件中樞的組態選項。

注意

如果您選擇使用安全性主體向 Microsoft Entra ID 進行驗證和授權,以存取 Azure 資源,請參閱 使用 Microsoft Entra ID 授權存取,以確保安全性主體已獲得存取 Azure 資源的足夠許可權。

spring-cloud-azure-stream-binder-eventhubs 的聯機可設定屬性:

財產 類型 描述
spring.cloud.azure.eventhubs.enabled 布爾 是否啟用 Azure 事件中樞。
spring.cloud.azure.eventhubs.connection-string 字串 事件中樞命名空間連接字串值。
spring.cloud.azure.eventhubs.namespace 字串 事件中樞命名空間值,這是 FQDN 的前置詞。 FQDN 應該由 NamespaceName.DomainName 組成
spring.cloud.azure.eventhubs.domain-name 字串 Azure 事件中樞命名空間值的功能變數名稱。
spring.cloud.azure.eventhubs.custom-endpoint-address 字串 自訂端點位址。

提示

一般 Azure 服務 SDK 組態選項也可以針對 Spring Cloud Azure Stream 事件中樞系結器進行設定。 Spring Cloud Azure 組態中引進支援的組態選項,而且可以使用統一前置詞 spring.cloud.azure.spring.cloud.azure.eventhubs.的前置詞來設定。

系結器預設也支援 spring Could Azure Resource Manager 。 若要瞭解如何擷取具有未授與 相關角色之安全性主體的連接字串,請參閱 Spring Could Azure Resource Manager的基本使用量 一節。

檢查點組態屬性

本節包含記憶體 Blob 服務的組態選項,用於保存分割區擁有權和檢查點資訊。

注意

從 4.0.0 版開始,當 spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 未手動啟用時,將不會自動建立記憶體容器,且名稱 來自 spring.cloud.stream.bindings.binding-name.destination

spring-cloud-azure-stream-binder-eventhubs 的檢查點設定屬性:

財產 類型 描述
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 布爾 如果不存在,是否允許建立容器。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name 字串 記憶體帳戶的名稱。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key 字串 儲存體帳戶存取金鑰。
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name 字串 記憶體容器名稱。

提示

常見的 Azure 服務 SDK 組態選項也可以針對記憶體 Blob 檢查點存放區進行設定。 Spring Cloud Azure 組態中引進支援的組態選項,而且可以使用統一前置詞 spring.cloud.azure.spring.cloud.azure.eventhubs.processor.checkpoint-store的前置詞來設定。

Azure 事件中樞系結組態屬性

下列選項分為四個區段:取用者屬性、進階取用者組態、產生者屬性和進階生產者設定。

取用者屬性

這些屬性會透過 EventHubsConsumerProperties公開。

注意

為了避免重複,因為版本 4.19.0 和 5.19.0,Spring Cloud Azure Stream Binder 事件中樞支持設定所有通道的值,格式為 spring.cloud.stream.eventhubs.default.consumer.<property>=<value>

spring-cloud-azure-stream-binder-eventhubs 的取用者可設定屬性:

財產 類型 描述
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode CheckpointMode 取用者決定如何檢查點訊息時使用的檢查點模式
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count 整數 決定每個分割區要執行一個檢查點的訊息數量。 只有在使用 PARTITION_COUNT 檢查點模式時才會生效。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval 期間 決定執行一個檢查點的時間間隔。 只有在使用 TIME 檢查點模式時才會生效。
spring.cloud.stream.eventhubs.bindings。<binding-name.consumer.batch.max-size 整數 批次中的事件數目上限。 批次取用者模式的必要專案。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time 期間 批次耗用的最大持續時間。 只有在啟用批次取用者模式且為選擇性時,才會生效。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval 期間 更新的時間間隔持續時間。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy 負載平衡策略。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval 期間 分割區擁有權到期的時間持續時間。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties 布爾 事件處理器是否應該在其相關聯的分割區上要求最後一個加入佇列事件的資訊,並追蹤接收事件時的資訊。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count 整數 取用者用來控制事件中樞取用者會在本機主動接收和排入佇列的事件數目。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position 將索引鍵對應為分割區標識碼,以及 StartPositionProperties 的值 如果數據分割的檢查點不存在於檢查點存放區中,則包含要用於每個數據分割之事件位置的對應。 此對應是以分割區標識碼為索引鍵。

注意

initial-partition-event-position 組態接受 map 來指定每個事件中樞的初始位置。 因此,其索引鍵是分割區標識符,而值是 StartPositionProperties,其中包含位移、序號、加入佇列日期時間和是否包含的屬性。 例如,您可以將它設定為

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
進階取用者設定

上述 連線檢查點常見的 Azure SDK 用戶端 組態支援自定義每個系結器取用者,您可以使用前置詞 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.進行設定。

產生者屬性

這些屬性會透過 EventHubsProducerProperties公開。

注意

為了避免重複,因為版本 4.19.0 和 5.19.0,Spring Cloud Azure Stream Binder 事件中樞支持設定所有通道的值,格式為 spring.cloud.stream.eventhubs.default.producer.<property>=<value>

spring-cloud-azure-stream-binder-eventhubs 的產生者可設定屬性:

財產 類型 描述
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync 布爾 產生者同步處理的切換旗標。 如果為 true,產生者會在傳送作業之後等候回應。
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout 在傳送作業之後等候回應的時間量。 只有在啟用同步產生者時才會生效。
進階生產者設定

上述 連線常見的 Azure SDK 用戶端 組態支援自定義每個系結器產生者,您可以使用前置詞 spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.進行設定。

基本用法

從事件中樞傳送和接收訊息

  1. 使用認證資訊填入組態選項。

    • 針對認證作為連接字串,請在 application.yml 檔案中設定下列屬性:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • 針對認證即服務主體,請在 application.yml 檔案中設定下列屬性:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

注意

tenant-id 允許的值包括:commonorganizationsconsumers或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID

  • 針對認證作為受控識別,請在 application.yml 檔案中設定下列屬性:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. 定義供應商和取用者。

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

數據分割支援

系統會建立具有使用者提供數據分割資訊的 PartitionSupplier,以設定要傳送之訊息的分割區資訊。 下列流程圖顯示取得分割區識別碼和索引鍵之不同優先順序的程式:

圖表,其中顯示數據分割支援程式的流程圖。

Batch 取用者支援

  1. 提供批次組態選項,如下列範例所示:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. 定義供應商和取用者。

    針對以 BATCH的檢查點模式,您可以使用下列程式代碼來傳送訊息,並以批次方式取用。

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    針對以 MANUAL的檢查點模式,您可以使用下列程式代碼來傳送訊息,並以批次方式取用/檢查點。

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

注意

在批次取用模式中,Spring Cloud Stream 系結器的預設內容類型是 application/json,因此請確定訊息承載與內容類型一致。 例如,當使用預設內容類型的 application/json 接收具有 String 承載的訊息時,承載應該 JSON String,並括住原始 String 文字的雙引號。 針對 text/plain 內容類型,它可以是直接 String 物件。 如需詳細資訊,請參閱 Spring Cloud Stream 內容類型交涉

處理錯誤訊息

  • 處理輸出系結錯誤訊息

    根據預設,Spring Integration 會建立名為 errorChannel的全域錯誤通道。 設定下列訊息端點來處理輸出系結錯誤訊息。

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • 處理輸入系結錯誤訊息

    Spring Cloud Stream 事件中樞系結器支援一個解決方案來處理輸入訊息系結的錯誤:錯誤處理程式。

    錯誤處理程式

    Spring Cloud Stream 會公開一種機制,讓您藉由新增可接受 Consumer 實例的 ErrorMessage 來提供自定義錯誤處理程式。 如需詳細資訊,請參閱 Spring Cloud Stream 檔中 處理錯誤訊息

    • 系結預設錯誤處理程式

      設定單一 Consumer 豆,以取用所有輸入系結錯誤訊息。 下列預設函式會訂閱每個輸入系結錯誤通道:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      您也需要將 spring.cloud.stream.default.error-handler-definition 屬性設定為函式名稱。

    • 系結特定錯誤處理程式

      設定 Consumer 豆取用特定的輸入系結錯誤訊息。 下列函式會訂閱特定的輸入系結錯誤通道,且優先順序高於系結預設錯誤處理程式:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      您也需要將 spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition 屬性設定為函式名稱。

事件中樞訊息標頭

如需支援的基本訊息標頭,請參閱 Spring IntegrationSpring Cloud Azure 支援的 事件中樞訊息標頭 一節。

多個系結器支援

使用多個系結器也支援連線到多個事件中樞命名空間。 此範例會採用連接字串作為範例。 也支援服務主體和受控識別的認證。 您可以在每個系結器的環境設定中設定相關的屬性。

  1. 若要搭配事件中樞使用多個系結器,請在 application.yml 檔案中設定下列屬性:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    注意

    上一個應用程式檔會示範如何為應用程式設定所有系結的單一預設輪詢器。 如果您要設定特定系結的輪詢器,您可以使用設定,例如 spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000

  2. 我們需要定義兩個供應商和兩個取用者:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

資源布建

事件中樞系結器支援布建事件中樞和取用者群組,使用者可以使用下列屬性來啟用布建。

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

注意

tenant-id 允許的值包括:commonorganizationsconsumers或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID

樣品

如需詳細資訊,請參閱 GitHub 上的 azure-spring-boot-samples 存放庫

適用於 Azure 服務總線的 Spring Cloud Stream Binder

重要概念

適用於 Azure 服務總線的 Spring Cloud Stream Binder 提供 Spring Cloud Stream 架構的系結實作。 此實作會在其基礎上使用 Spring Integration Service 總線通道配接器。

排程的訊息

此系結器支援將訊息提交至主題以進行延遲處理。 用戶可以傳送具有標頭 x-delay 以毫秒表示訊息延遲時間的排程訊息。 訊息會在 x-delay 毫秒之後傳遞至個別主題。

取用者群組

服務總線主題提供與 Apache Kafka 類似的取用者群組支援,但邏輯稍有不同。 此系結器依賴主題 Subscription 作為取用者群組。

相依性設定

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Azure Stream 服務總線入門版,如下列 Maven 範例所示:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

配置

系結器提供下列兩個組態選項部分:

聯機組態屬性

本節包含用來連線到 Azure 服務總線的組態選項。

注意

如果您選擇使用安全性主體向 Microsoft Entra ID 進行驗證和授權,以存取 Azure 資源,請參閱 使用 Microsoft Entra ID 授權存取,以確保安全性主體已獲得存取 Azure 資源的足夠許可權。

spring-cloud-azure-stream-binder-servicebus 的聯機可設定屬性:

財產 類型 描述
spring.cloud.azure.servicebus.enabled 布爾 是否啟用 Azure 服務總線。
spring.cloud.azure.servicebus.connection-string 字串 服務總線命名空間連接字串值。
spring.cloud.azure.servicebus.custom-endpoint-address 字串 連接到服務總線時要使用的自訂端點位址。
spring.cloud.azure.servicebus.namespace 字串 服務總線命名空間值,這是 FQDN 的前置詞。 FQDN 應該由 NamespaceName.DomainName 組成
spring.cloud.azure.servicebus.domain-name 字串 Azure 服務總線命名空間值的功能變數名稱。

注意

一般 Azure 服務 SDK 組態選項也可以針對 Spring Cloud Azure Stream 服務總線系結器進行設定。 Spring Cloud Azure 組態中引進支援的組態選項,而且可以使用統一前置詞 spring.cloud.azure.spring.cloud.azure.servicebus.的前置詞來設定。

系結器預設也支援 spring Could Azure Resource Manager 。 若要瞭解如何擷取具有未授與 相關角色之安全性主體的連接字串,請參閱 Spring Could Azure Resource Manager的基本使用量 一節。

Azure 服務總線系結組態屬性

下列選項分為四個區段:取用者屬性、進階取用者組態、產生者屬性和進階生產者設定。

取用者屬性

這些屬性會透過 ServiceBusConsumerProperties公開。

注意

為了避免重複,因為版本 4.19.0 和 5.19.0,Spring Cloud Azure Stream Binder 服務總線支援設定所有通道的值,格式為 spring.cloud.stream.servicebus.default.consumer.<property>=<value>

spring-cloud-azure-stream-binder-servicebus 的取用者可設定屬性:

財產 類型 違約 描述
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected 布爾 如果失敗的訊息會路由傳送至 DLQ。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls 整數 1 服務總線處理器客戶端應該處理的最大並行訊息。 啟用工作階段時,它會套用至每個工作階段。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions 整數 在任何指定時間要處理的並行會話數目上限。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled 布爾 是否啟用工作階段。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count 整數 0 服務總線處理器用戶端的預先擷取計數。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue 沒有 要連接的子佇列類型。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration 期間 5m 繼續自動更新鎖定的時間量。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock 服務總線處理器用戶端的接收模式。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete 布爾 是否要自動解決訊息。 如果設定為 false,將會新增 Checkpointer 的訊息標頭,讓開發人員手動解決訊息。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-mb 1024 佇列/主題的大小上限,以 MB 為單位,這是為佇列/主題配置的記憶體大小。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live 期間 P10675199DT2H48M5.4775807S。 (10675199天、2小時、48分、5秒和477毫秒) 訊息到期的持續時間,從訊息傳送至服務總線時開始。

重要

當您使用 azure Resource Manager 時,您必須設定 屬性。 如需詳細資訊,請參閱 GitHub 上的 servicebus-queue-binder-arm 範例。

進階取用者設定

上述 連線常見的 Azure SDK 用戶端 組態支援自定義每個系結器取用者,您可以使用前置詞 spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.進行設定。

產生者屬性

這些屬性會透過 ServiceBusProducerProperties公開。

注意

為了避免重複,因為版本 4.19.0 和 5.19.0,Spring Cloud Azure Stream Binder 服務總線支援設定所有通道的值,格式為 spring.cloud.stream.servicebus.default.producer.<property>=<value>

spring-cloud-azure-stream-binder-servicebus 的生產者可設定屬性:

財產 類型 違約 描述
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync 布爾 切換產生者同步處理的旗標。
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout 10000 傳送產生者的逾時值。
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType 系結產生者所需的服務總線實體類型。
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-mb 1024 佇列/主題的大小上限,以 MB 為單位,這是為佇列/主題配置的記憶體大小。
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live 期間 P10675199DT2H48M5.4775807S。 (10675199天、2小時、48分、5秒和477毫秒) 訊息到期的持續時間,從訊息傳送至服務總線時開始。

重要

使用系結產生者時,必須設定 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type 的屬性。

進階生產者設定

上述 連線常見的 Azure SDK 用戶端 組態支援自定義每個系結器產生者,您可以使用前置詞 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.進行設定。

基本用法

從/到服務總線傳送和接收訊息

  1. 使用認證資訊填入組態選項。

    • 針對認證作為連接字串,請在 application.yml 檔案中設定下列屬性:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • 針對認證即服務主體,請在 application.yml 檔案中設定下列屬性:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

注意

tenant-id 允許的值包括:commonorganizationsconsumers或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID

  • 針對認證作為受控識別,請在 application.yml 檔案中設定下列屬性:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. 定義供應商和取用者。

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

數據分割索引鍵支援

系結器支援 服務總線分割,方法是在訊息標頭中設定分割區索引鍵和會話標識符。 本節介紹如何設定訊息的數據分割索引鍵。

Spring Cloud Stream 提供分割區索引鍵 SpEL 運算式屬性 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression。 例如,將此屬性設定為 "'partitionKey-' + headers[<message-header-key>]",並新增名為 message-header-key 的標頭。 在評估表達式以指派分割區索引鍵時,Spring Cloud Stream 會使用此標頭的值。 下列程式代碼提供範例產生者:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

會話支援

系結器支援服務總線 訊息會話。 訊息的會話標識碼可以透過訊息標頭來設定。

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

注意

根據 服務總線分割,會話標識符的優先順序高於分割區索引鍵。 因此,當同時設定 ServiceBusMessageHeaders#SESSION_IDServiceBusMessageHeaders#PARTITION_KEY 標頭時,會話標識碼的值最終會用來覆寫分割區索引鍵的值。

處理錯誤訊息

  • 處理輸出系結錯誤訊息

    根據預設,Spring Integration 會建立名為 errorChannel的全域錯誤通道。 設定下列訊息端點來處理輸出系結錯誤訊息。

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • 處理輸入系結錯誤訊息

    Spring Cloud Stream 服務總線系結器支援兩個解決方案來處理輸入訊息系結的錯誤:系結器錯誤處理程式和處理程式。

    Binder 錯誤處理程式

    默認系結器錯誤處理程式會處理輸入系結。 啟用 spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected 時,您可以使用此處理程式將失敗的訊息傳送至寄不出的信件佇列。 否則,會放棄失敗的訊息。 系結器錯誤處理程式與其他提供的錯誤處理程式互斥。

    錯誤處理程式

    Spring Cloud Stream 會公開一種機制,讓您藉由新增可接受 Consumer 實例的 ErrorMessage 來提供自定義錯誤處理程式。 如需詳細資訊,請參閱 Spring Cloud Stream 檔中 處理錯誤訊息

    • 系結預設錯誤處理程式

      設定單一 Consumer 豆,以取用所有輸入系結錯誤訊息。 下列預設函式會訂閱每個輸入系結錯誤通道:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      您也需要將 spring.cloud.stream.default.error-handler-definition 屬性設定為函式名稱。

    • 系結特定錯誤處理程式

      設定 Consumer 豆取用特定的輸入系結錯誤訊息。 下列函式會訂閱優先順序高於系結預設錯誤處理程式的特定輸入系結錯誤通道。

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      您也需要將 spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition 屬性設定為函式名稱。

服務總線訊息標頭

如需支援的基本訊息 標頭,請參閱 Spring IntegrationSpring Cloud Azure 支援的 服務總線訊息標頭一節。

注意

設定分割區索引鍵時,訊息標頭的優先順序高於 Spring Cloud Stream 屬性。 因此,只有當未設定任何 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionServiceBusMessageHeaders#SESSION_ID 標頭時,ServiceBusMessageHeaders#PARTITION_KEY 才會生效。

多個系結器支援

使用多個系結器也支援連線到多個服務總線命名空間。 此範例會採用連接字串作為範例。 也支援服務主體和受控識別的認證,用戶可以在每個系結器的環境設定中設定相關的屬性。

  1. 若要使用 ServiceBus 的多個系結器,請在 application.yml 檔案中設定下列屬性:

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    注意

    上一個應用程式檔會示範如何為應用程式設定所有系結的單一預設輪詢器。 如果您要設定特定系結的輪詢器,您可以使用設定,例如 spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000

  2. 我們需要定義兩個供應商和兩個消費者

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

資源布建

服務總線系結器支援布建佇列、主題和訂用帳戶,使用者可以使用下列屬性來啟用布建。

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

注意

tenant-id 允許的值包括:commonorganizationsconsumers或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID

自訂服務總線客戶端屬性

開發人員可以使用 AzureServiceClientBuilderCustomizer 來自定義服務總線用戶端屬性。 下列範例會自訂 sessionIdleTimeout中的 ServiceBusClientBuilder 屬性:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

樣品

如需詳細資訊,請參閱 GitHub 上的 azure-spring-boot-samples 存放庫