共用方式為


Spring Cloud Azure 對 Spring Integration 的支援

本文適用於:✅ 4.19.0 ✅ 5.19.0 版

適用於 Azure 的 Spring Integration Extension 提供適用於 JavaAzure SDK 所提供的各種服務的 Spring Integration 配接器。 我們提供這些 Azure 服務的 Spring Integration 支援:事件中樞、服務總線、記憶體佇列。 以下是支援的配接器清單:

Spring 與 Azure 事件中樞整合

重要概念

Azure 事件中樞是巨量數據串流平臺和事件擷取服務。 它可以每秒接收和處理數百萬個事件。 傳送至事件中樞的數據可以使用任何即時分析提供者或批處理/記憶體配接器來轉換和儲存。

Spring Integration 可在 Spring 型應用程式中啟用輕量型傳訊,並支援透過宣告式配接器與外部系統整合。 這些配接器會針對 Spring 支援遠端、傳訊和排程,提供更高層級的抽象概念。 事件中樞 擴充專案的 Spring Integration 提供 Azure 事件中樞的輸入和輸出通道適配卡和閘道。

注意

RxJava 支援 API 會從 4.0.0 版卸除。 如需詳細資訊,請參閱 Javadoc。

取用者群組

事件中樞提供與 Apache Kafka 類似的取用者群組支援,但邏輯稍有不同。 雖然 Kafka 會將所有認可的位移儲存在訊息代理程式中,但您必須手動儲存事件中樞訊息的位移。 事件中樞 SDK 提供函式,以將這類位移儲存在 Azure 記憶體內。

數據分割支援

事件中樞提供與 Kafka 類似的實體分割區概念。 但與 Kafka 在取用者和分割區之間的自動重新平衡不同,事件中樞提供一種先佔模式。 記憶體帳戶可作為租用,以判斷哪一個取用者擁有哪個分割區。 當新的取用者啟動時,它會嘗試從最繁重的取用者竊取一些分割區,以達到工作負載平衡。

若要指定負載平衡策略,開發人員可以使用 EventHubsContainerProperties 來進行設定。 如需如何設定 的範例,請參閱下一節

Batch 取用者支援

EventHubsInboundChannelAdapter 支援批次取用模式。 若要啟用,用戶可以在建構 ListenerMode.BATCH 實例時,將接聽程式模式指定為 EventHubsInboundChannelAdapter。 啟用時,訊息,其中承載會接收並傳遞至下游通道的批次事件清單。 每個訊息標頭也會轉換成清單,其中內容是從每個事件剖析的相關聯標頭值。 針對分割區標識碼、檢查點器和最後一個加入佇列屬性的公用標頭,它們會以單一值呈現給整個事件批次共用相同的值。 如需詳細資訊,請參閱 事件中樞訊息標頭 一節。

注意

只有在使用 MANUAL 檢查點模式時,才存在檢查點標頭

批次取用者的檢查點支援兩種模式:BATCHMANUALBATCH 模式是一種自動檢查點模式,會在收到事件后,將整個事件批次一起檢查點。 MANUAL 模式是讓使用者檢查事件。 使用時,Checkpointer 會傳遞至訊息標頭,而且使用者可以使用它來執行檢查點。

批次取用原則可由 max-sizemax-wait-time的屬性指定,其中 max-size 是選擇性 max-wait-time 時的必要屬性。 若要指定批次取用策略,開發人員可以使用 EventHubsContainerProperties 來進行設定。 如需如何設定 的範例,請參閱下一節

相依性設定

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

配置

此入門提供下列 3 個部分的組態選項:

聯機組態屬性

本節包含用來連線到 Azure 事件中樞的組態選項。

注意

如果您選擇使用安全性主體向 Microsoft Entra ID 進行驗證和授權,以存取 Azure 資源,請參閱 使用 Microsoft Entra ID 授權存取,以確保安全性主體已獲得存取 Azure 資源的足夠許可權。

spring-cloud-azure-starter-integration-eventhubs 的聯機可設定屬性:

財產 類型 描述
spring.cloud.azure.eventhubs.enabled 布爾 是否啟用 Azure 事件中樞。
spring.cloud.azure.eventhubs.connection-string 字串 事件中樞命名空間連接字串值。
spring.cloud.azure.eventhubs.namespace 字串 事件中樞命名空間值,這是 FQDN 的前置詞。 FQDN 應該由 NamespaceName.DomainName 組成
spring.cloud.azure.eventhubs.domain-name 字串 Azure 事件中樞命名空間值的功能變數名稱。
spring.cloud.azure.eventhubs.custom-endpoint-address 字串 自訂端點位址。
spring.cloud.azure.eventhubs.shared-connection 布爾 基礎 EventProcessorClient 和 EventHubProducerAsyncClient 是否使用相同的連線。 根據預設,系統會針對每個建立的事件中樞用戶端建構及使用新的連線。

檢查點組態屬性

本節包含記憶體 Blob 服務的組態選項,用於保存分割區擁有權和檢查點資訊。

注意

從 4.0.0 版開始,當 spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 未手動啟用時,將不會自動建立任何記憶體容器。

spring-cloud-azure-starter-integration-eventhubs 的檢查點設定屬性:

財產 類型 描述
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 布爾 如果不存在,是否允許建立容器。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name 字串 記憶體帳戶的名稱。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key 字串 儲存體帳戶存取金鑰。
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name 字串 記憶體容器名稱。

常見的 Azure 服務 SDK 組態選項也可以針對記憶體 Blob 檢查點存放區進行設定。 Spring Cloud Azure 組態中引進支援的組態選項,而且可以使用統一前置詞 spring.cloud.azure.spring.cloud.azure.eventhubs.processor.checkpoint-store的前置詞來設定。

事件中樞處理器組態屬性

EventHubsInboundChannelAdapter 會使用 EventProcessorClient 從事件中樞取用訊息,來設定 EventProcessorClient的整體屬性,開發人員可以使用 EventHubsContainerProperties 進行設定。 請參閱下一節 如何使用 EventHubsInboundChannelAdapter

基本用法

將訊息傳送至 Azure 事件中樞

  1. 填入認證組態選項。

    • 針對認證作為連接字串,請在 application.yml 檔案中設定下列屬性:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • 針對認證作為受控識別,請在 application.yml 檔案中設定下列屬性:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • 針對認證即服務主體,請在 application.yml 檔案中設定下列屬性:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

注意

tenant-id 允許的值包括:commonorganizationsconsumers或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID

  1. 使用 DefaultMessageHandler 豆建立 EventHubsTemplate,以將訊息傳送至事件中樞。

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. 透過訊息通道建立具有上述訊息處理程式的訊息網關係結。

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. 使用閘道傳送訊息。

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

從 Azure 事件中樞接收訊息

  1. 填入認證組態選項。

  2. 建立訊息通道的 Bean 做為輸入通道。

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. 使用 EventHubsInboundChannelAdapter 豆建立 EventHubsMessageListenerContainer,以接收來自事件中樞的訊息。

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. 透過之前建立的訊息通道,使用 EventHubsInboundChannelAdapter 建立訊息接收者系結。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

設定 EventHubsMessageConverter 以自定義 objectMapper

EventHubsMessageConverter 做為可設定的豆豆,讓使用者自定義 ObjectMapper。

Batch 取用者支援

若要在批次中取用來自事件中樞的訊息,與上述範例類似,除了用戶應該為 EventHubsInboundChannelAdapter設定批次取用的相關組態選項。

建立 EventHubsInboundChannelAdapter時,接聽程式模式應該設定為 BATCH。 建立 EventHubsMessageListenerContainer的 bean 時,請將檢查點模式設定為 MANUALBATCH,並視需要設定批次選項。

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

事件中樞訊息標頭

下表說明事件中樞訊息屬性如何對應至 Spring 訊息標頭。 針對 Azure 事件中樞,訊息稱為 event

在記錄接聽程式模式中,事件中樞訊息/事件屬性與 Spring Message 標頭之間的對應:

事件中樞事件屬性 Spring Message 標頭常數 類型 描述
加入佇列的時間 EventHubsHeaders#ENQUEUED_TIME 瞬間 在事件中樞分割區中加入佇列事件的瞬間,以 UTC 為單位。
抵消 EventHubsHeaders#OFFSET 從相關聯的事件中樞分割區收到事件的位移。
數據分割索引鍵 AzureHeaders#PARTITION_KEY 字串 如果在最初發佈事件時設定分割區哈希索引鍵,
數據分割標識碼 AzureHeaders#RAW_PARTITION_ID 字串 事件中樞的數據分割標識碼。
序號 EventHubsHeaders#SEQUENCE_NUMBER 在相關聯的事件中樞分割區中加入佇列時,指派給事件的序號。
最後加入佇列的事件屬性 EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties 此分割區中最後一個加入佇列事件的屬性。
AzureHeaders#CHECKPOINTER Checkpointer 特定訊息檢查點的標頭。

使用者可以剖析訊息標頭,以取得每個事件的相關信息。 若要設定事件的訊息標頭,所有自定義標頭都會放置為事件的應用程式屬性,其中標頭會設定為屬性索引鍵。 從事件中樞接收事件時,所有應用程式屬性都會轉換成訊息標頭。

注意

不支援手動設定數據分割索引鍵、加入佇列的時間、位移和序號的訊息標頭。

啟用批次取用者模式時,會列出下列特定批次訊息標頭,其中包含每個單一事件中樞事件的值清單。

在批次接聽程式模式中,事件中樞訊息/事件屬性與 Spring Message 標頭之間的對應:

事件中樞事件屬性 Spring Batch 訊息標頭常數 類型 描述
加入佇列的時間 EventHubsHeaders#ENQUEUED_TIME 立即清單 在事件中樞分割區中加入佇列時,以 UTC 為單位的立即清單。
抵消 EventHubsHeaders#OFFSET Long 清單 從相關聯的事件中樞分割區收到每個事件的位移清單。
數據分割索引鍵 AzureHeaders#PARTITION_KEY 字串清單 如果在最初發佈每個事件時設定分割區哈希索引鍵的清單。
序號 EventHubsHeaders#SEQUENCE_NUMBER Long 清單 在相關聯的事件中樞分割區中加入佇列時,指派給每個事件的序號清單。
系統屬性 EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES 地圖清單 每個事件的系統屬性清單。
應用程式屬性 EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES 地圖清單 每個事件的應用程式屬性清單,其中會放置所有自定義訊息標頭或事件屬性。

注意

發佈訊息時,如果存在,則會從訊息中移除上述所有批次標頭。

樣品

如需詳細資訊,請參閱 GitHub 上的 azure-spring-boot-samples 存放庫

Spring 與 Azure 服務總線整合

重要概念

Spring Integration 可在 Spring 型應用程式中啟用輕量型傳訊,並支援透過宣告式配接器與外部系統整合。

Azure 服務總線延伸模組專案的 Spring Integration 提供 Azure 服務總線的輸入和輸出通道配接器。

注意

CompletableFuture 支援 API 已從 2.10.0 版淘汰,並由 4.0.0 版的 Reactor Core 取代。 如需詳細資訊,請參閱 Javadoc。

相依性設定

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

配置

此入門提供下列 2 個部分的組態選項:

聯機組態屬性

本節包含用來連線到 Azure 服務總線的組態選項。

注意

如果您選擇使用安全性主體向 Microsoft Entra ID 進行驗證和授權,以存取 Azure 資源,請參閱 使用 Microsoft Entra ID 授權存取,以確保安全性主體已獲得存取 Azure 資源的足夠許可權。

spring-cloud-azure-starter-integration-servicebus 的連線可設定屬性:

財產 類型 描述
spring.cloud.azure.servicebus.enabled 布爾 是否啟用 Azure 服務總線。
spring.cloud.azure.servicebus.connection-string 字串 服務總線命名空間連接字串值。
spring.cloud.azure.servicebus.custom-endpoint-address 字串 連接到服務總線時要使用的自訂端點位址。
spring.cloud.azure.servicebus.namespace 字串 服務總線命名空間值,這是 FQDN 的前置詞。 FQDN 應該由 NamespaceName.DomainName 組成
spring.cloud.azure.servicebus.domain-name 字串 Azure 服務總線命名空間值的功能變數名稱。

服務總線處理器組態屬性

ServiceBusInboundChannelAdapter 會使用 ServiceBusProcessorClient 來取用訊息,來設定 ServiceBusProcessorClient的整體屬性,開發人員可以使用 ServiceBusContainerProperties 來進行設定。 請參閱下一節 如何使用 ServiceBusInboundChannelAdapter

基本用法

將訊息傳送至 Azure 服務總線

  1. 填入認證組態選項。

    • 針對認證作為連接字串,請在 application.yml 檔案中設定下列屬性:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • 針對認證作為受控識別,請在 application.yml 檔案中設定下列屬性:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

注意

tenant-id 允許的值包括:commonorganizationsconsumers或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID

  • 針對認證即服務主體,請在 application.yml 檔案中設定下列屬性:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

注意

tenant-id 允許的值包括:commonorganizationsconsumers或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID

  1. 使用 DefaultMessageHandler 豆建立 ServiceBusTemplate,以將訊息傳送至服務總線,並設定 ServiceBusTemplate 的實體類型。 此範例會採用服務總線佇列作為範例。

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. 透過訊息通道建立具有上述訊息處理程式的訊息網關係結。

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. 使用閘道傳送訊息。

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

從 Azure 服務總線接收訊息

  1. 填入認證組態選項。

  2. 建立訊息通道的 Bean 做為輸入通道。

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. 使用 ServiceBusInboundChannelAdapter 豆建立 ServiceBusMessageListenerContainer,以接收服務總線的訊息。 此範例會採用服務總線佇列作為範例。

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. 透過我們先前建立的訊息通道,建立具有 ServiceBusInboundChannelAdapter 的訊息接收者系結。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

設定 ServiceBusMessageConverter 以自定義 objectMapper

ServiceBusMessageConverter 會設定為可設定的豆子,讓使用者自定義 ObjectMapper

服務總線訊息標頭

對於一些可以對應至多個 Spring 標頭常數的服務總線標頭,會列出不同 Spring 標頭的優先順序。

服務總線標頭與 Spring 標頭之間的對應:

服務總線訊息標頭和屬性 Spring message header 常數 類型 配置 描述
內容類型 MessageHeaders#CONTENT_TYPE 字串 是的 訊息的RFC2045內容類型描述元。
相互關聯標識碼 ServiceBusMessageHeaders#CORRELATION_ID 字串 是的 訊息的相互關聯標識碼
訊息標識碼 ServiceBusMessageHeaders#MESSAGE_ID 字串 是的 訊息的訊息識別碼,此標頭的優先順序高於 MessageHeaders#ID
訊息標識碼 MessageHeaders#ID UUID 是的 訊息的訊息識別碼,此標頭的優先順序低於 ServiceBusMessageHeaders#MESSAGE_ID
數據分割索引鍵 ServiceBusMessageHeaders#PARTITION_KEY 字串 是的 將訊息傳送至數據分割實體的數據分割索引鍵。
回復 MessageHeaders#REPLY_CHANNEL 字串 是的 要傳送回復之實體的位址。
回復會話標識碼 ServiceBusMessageHeaders#REPLY_TO_SESSION_ID 字串 是的 訊息的 ReplyToGroupId 屬性值。
排程加入佇列時間 utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime 是的 應該在服務總線中加入佇列訊息的日期時間,此標頭的優先順序高於 AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE
排程加入佇列時間 utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE 整數 是的 應該在服務總線中加入佇列訊息的日期時間,此標頭的優先順序低於 ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME
會話標識碼 ServiceBusMessageHeaders#SESSION_ID 字串 是的 會話感知實體的會話 IDentifier。
存留時間 ServiceBusMessageHeaders#TIME_TO_LIVE 期間 是的 此訊息到期之前的持續時間。
ServiceBusMessageHeaders#TO 字串 是的 訊息的「收件者」位址,保留供未來在路由案例中使用,且目前由訊息代理程式本身忽略。
主題 ServiceBusMessageHeaders#SUBJECT 字串 是的 訊息的主旨。
寄不出的信件錯誤描述 ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION 字串 已寄不出的訊息描述。
寄不出的信件原因 ServiceBusMessageHeaders#DEAD_LETTER_REASON 字串 訊息寄不出的信件原因。
寄不出的信件來源 ServiceBusMessageHeaders#DEAD_LETTER_SOURCE 字串 訊息已寄不出的實體。
傳遞計數 ServiceBusMessageHeaders#DELIVERY_COUNT 此訊息傳遞至客戶端的次數。
加入佇列的序號 ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER 依服務總線指派給訊息的佇列序號。
加入佇列的時間 ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime 此訊息在服務總線中加入佇列的日期時間。
到期時間: ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime 此訊息到期的日期時間。
鎖定令牌 ServiceBusMessageHeaders#LOCK_TOKEN 字串 目前訊息的鎖定令牌。
鎖定直到 ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime 此訊息鎖定到期的日期時間。
序號 ServiceBusMessageHeaders#SEQUENCE_NUMBER 服務總線指派給訊息的唯一號碼。
ServiceBusMessageHeaders#STATE ServiceBusMessageState 訊息的狀態,可以是 [作用中]、[延遲] 或 [已排程]。

數據分割索引鍵支援

此入門支援 服務總線分割,方法是允許在訊息標頭中設定分割區索引鍵和會話標識符。 本節介紹如何設定訊息的數據分割索引鍵。

建議:使用 ServiceBusMessageHeaders.PARTITION_KEY 作為標頭的索引鍵。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

不建議,但目前支援:AzureHeaders.PARTITION_KEY 做為標頭的索引鍵。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

注意

當訊息標頭中同時設定 ServiceBusMessageHeaders.PARTITION_KEYAzureHeaders.PARTITION_KEY 時,建議使用 ServiceBusMessageHeaders.PARTITION_KEY

會話支援

此範例示範如何在應用程式中手動設定訊息的會話標識碼。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

注意

當訊息標頭中設定 ServiceBusMessageHeaders.SESSION_ID,而且也會設定不同的 ServiceBusMessageHeaders.PARTITION_KEY 標頭時,會話標識碼的值最終將用來覆寫分割區索引鍵的值。

自訂服務總線客戶端屬性

開發人員可以使用 AzureServiceClientBuilderCustomizer 來自定義服務總線用戶端屬性。 下列範例會自訂 sessionIdleTimeout中的 ServiceBusClientBuilder 屬性:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

樣品

如需詳細資訊,請參閱 GitHub 上的 azure-spring-boot-samples 存放庫

Spring 與 Azure 記憶體佇列整合

重要概念

Azure 佇列記憶體是用來儲存大量訊息的服務。 您可以使用 HTTP 或 HTTPS 透過已驗證的呼叫,從世界各地存取訊息。 佇列訊息的大小最多可達 64 KB。 佇列可能包含數百萬則訊息,最多可達記憶體帳戶的總容量限制。 佇列通常用來建立待處理工作待辦專案,以異步處理。

相依性設定

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

配置

此入門提供下列組態選項:

聯機組態屬性

本節包含用來連線到 Azure 記憶體佇列的組態選項。

注意

如果您選擇使用安全性主體向 Microsoft Entra ID 進行驗證和授權,以存取 Azure 資源,請參閱 使用 Microsoft Entra ID 授權存取,以確保安全性主體已獲得存取 Azure 資源的足夠許可權。

spring-cloud-azure-starter-integration-storage-queue 的聯機可設定屬性:

財產 類型 描述
spring.cloud.azure.storage.queue.enabled 布爾 是否啟用 Azure 記憶體佇列。
spring.cloud.azure.storage.queue.connection-string 字串 記憶體佇列命名空間連接字串值。
spring.cloud.azure.storage.queue.accountName 字串 記憶體佇列帳戶名稱。
spring.cloud.azure.storage.queue.accountKey 字串 記憶體仱列帳戶金鑰。
spring.cloud.azure.storage.queue.endpoint 字串 記憶體佇列服務端點。
spring.cloud.azure.storage.queue.sasToken 字串 Sas 令牌認證
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion 發出 API 要求時所使用的 QueueServiceVersion。
spring.cloud.azure.storage.queue.messageEncoding 字串 佇列訊息編碼。

基本用法

將訊息傳送至 Azure 記憶體佇列

  1. 填入認證組態選項。

    • 針對認證作為連接字串,請在 application.yml 檔案中設定下列屬性:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • 針對認證作為受控識別,請在 application.yml 檔案中設定下列屬性:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

注意

tenant-id 允許的值包括:commonorganizationsconsumers或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID

  • 針對認證即服務主體,請在 application.yml 檔案中設定下列屬性:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

注意

tenant-id 允許的值包括:commonorganizationsconsumers或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID

  1. 使用 DefaultMessageHandler 豆建立 StorageQueueTemplate,以將訊息傳送至記憶體佇列。

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. 透過訊息通道,使用上述訊息處理程式建立訊息網關係結。

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. 使用閘道傳送訊息。

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

從 Azure 記憶體佇列接收訊息

  1. 填入認證組態選項。

  2. 建立訊息通道的 Bean 做為輸入通道。

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. 使用 StorageQueueMessageSource 豆建立 StorageQueueTemplate,以接收記憶體佇列的訊息。

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. 透過我們先前建立的訊息通道,建立訊息接收者系結,並在最後一個步驟中建立 StorageQueueMessageSource。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

樣品

如需詳細資訊,請參閱 GitHub 上的 azure-spring-boot-samples 存放庫