Spring Cloud Azure 對 Spring Integration 的支援
本文適用於:✅ 4.19.0 ✅ 5.19.0 版
適用於 Azure 的 Spring Integration Extension 提供適用於 Java
-
spring-cloud-azure-starter-integration-eventhubs
- 如需詳細資訊,請參閱 Spring Integration with Azure Event Hubs -
spring-cloud-azure-starter-integration-servicebus
- 如需詳細資訊,請參閱 Spring Integration with Azure Service Bus -
spring-cloud-azure-starter-integration-storage-queue
- 如需詳細資訊,請參閱 Spring Integration with Azure Storage Queue
Spring 與 Azure 事件中樞整合
重要概念
Azure 事件中樞是巨量數據串流平臺和事件擷取服務。 它可以每秒接收和處理數百萬個事件。 傳送至事件中樞的數據可以使用任何即時分析提供者或批處理/記憶體配接器來轉換和儲存。
Spring Integration 可在 Spring 型應用程式中啟用輕量型傳訊,並支援透過宣告式配接器與外部系統整合。 這些配接器會針對 Spring 支援遠端、傳訊和排程,提供更高層級的抽象概念。 事件中樞 擴充專案的
注意
RxJava 支援 API 會從 4.0.0 版卸除。 如需詳細資訊,請參閱 Javadoc。
取用者群組
事件中樞提供與 Apache Kafka 類似的取用者群組支援,但邏輯稍有不同。 雖然 Kafka 會將所有認可的位移儲存在訊息代理程式中,但您必須手動儲存事件中樞訊息的位移。 事件中樞 SDK 提供函式,以將這類位移儲存在 Azure 記憶體內。
數據分割支援
事件中樞提供與 Kafka 類似的實體分割區概念。 但與 Kafka 在取用者和分割區之間的自動重新平衡不同,事件中樞提供一種先佔模式。 記憶體帳戶可作為租用,以判斷哪一個取用者擁有哪個分割區。 當新的取用者啟動時,它會嘗試從最繁重的取用者竊取一些分割區,以達到工作負載平衡。
若要指定負載平衡策略,開發人員可以使用 EventHubsContainerProperties
來進行設定。 如需如何設定
Batch 取用者支援
EventHubsInboundChannelAdapter
支援批次取用模式。 若要啟用,用戶可以在建構 ListenerMode.BATCH
實例時,將接聽程式模式指定為 EventHubsInboundChannelAdapter
。
啟用時,訊息,其中承載會接收並傳遞至下游通道的批次事件清單。 每個訊息標頭也會轉換成清單,其中內容是從每個事件剖析的相關聯標頭值。 針對分割區標識碼、檢查點器和最後一個加入佇列屬性的公用標頭,它們會以單一值呈現給整個事件批次共用相同的值。 如需詳細資訊,請參閱 事件中樞訊息標頭 一節。
注意
只有在使用 MANUAL 檢查點模式時,才存在檢查點標頭
批次取用者的檢查點支援兩種模式:BATCH
和 MANUAL
。
BATCH
模式是一種自動檢查點模式,會在收到事件后,將整個事件批次一起檢查點。
MANUAL
模式是讓使用者檢查事件。 使用時,Checkpointer 會傳遞至訊息標頭,而且使用者可以使用它來執行檢查點。
批次取用原則可由 max-size
和 max-wait-time
的屬性指定,其中 max-size
是選擇性 max-wait-time
時的必要屬性。
若要指定批次取用策略,開發人員可以使用 EventHubsContainerProperties
來進行設定。 如需如何設定
相依性設定
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
配置
此入門提供下列 3 個部分的組態選項:
聯機組態屬性
本節包含用來連線到 Azure 事件中樞的組態選項。
注意
如果您選擇使用安全性主體向 Microsoft Entra ID 進行驗證和授權,以存取 Azure 資源,請參閱 使用 Microsoft Entra ID 授權存取,以確保安全性主體已獲得存取 Azure 資源的足夠許可權。
spring-cloud-azure-starter-integration-eventhubs 的聯機可設定屬性:
財產 | 類型 | 描述 |
---|---|---|
spring.cloud.azure.eventhubs.enabled | 布爾 | 是否啟用 Azure 事件中樞。 |
spring.cloud.azure.eventhubs.connection-string | 字串 | 事件中樞命名空間連接字串值。 |
spring.cloud.azure.eventhubs.namespace | 字串 | 事件中樞命名空間值,這是 FQDN 的前置詞。 FQDN 應該由 NamespaceName.DomainName 組成 |
spring.cloud.azure.eventhubs.domain-name | 字串 | Azure 事件中樞命名空間值的功能變數名稱。 |
spring.cloud.azure.eventhubs.custom-endpoint-address | 字串 | 自訂端點位址。 |
spring.cloud.azure.eventhubs.shared-connection | 布爾 | 基礎 EventProcessorClient 和 EventHubProducerAsyncClient 是否使用相同的連線。 根據預設,系統會針對每個建立的事件中樞用戶端建構及使用新的連線。 |
檢查點組態屬性
本節包含記憶體 Blob 服務的組態選項,用於保存分割區擁有權和檢查點資訊。
注意
從 4.0.0 版開始,當 spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 未手動啟用時,將不會自動建立任何記憶體容器。
spring-cloud-azure-starter-integration-eventhubs 的檢查點設定屬性:
財產 | 類型 | 描述 |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | 布爾 | 如果不存在,是否允許建立容器。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | 字串 | 記憶體帳戶的名稱。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | 字串 | 儲存體帳戶存取金鑰。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | 字串 | 記憶體容器名稱。 |
常見的 Azure 服務 SDK 組態選項也可以針對記憶體 Blob 檢查點存放區進行設定。
Spring Cloud Azure 組態中引進支援的組態選項,而且可以使用統一前置詞 spring.cloud.azure.
或 spring.cloud.azure.eventhubs.processor.checkpoint-store
的前置詞來設定。
事件中樞處理器組態屬性
EventHubsInboundChannelAdapter
會使用 EventProcessorClient
從事件中樞取用訊息,來設定 EventProcessorClient
的整體屬性,開發人員可以使用 EventHubsContainerProperties
進行設定。 請參閱下一節 如何使用 EventHubsInboundChannelAdapter
。
基本用法
將訊息傳送至 Azure 事件中樞
填入認證組態選項。
針對認證作為連接字串,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
針對認證作為受控識別,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
針對認證即服務主體,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
注意
tenant-id
允許的值包括:common
、organizations
、consumers
或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者的 使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID。
使用
DefaultMessageHandler
豆建立EventHubsTemplate
,以將訊息傳送至事件中樞。class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
透過訊息通道建立具有上述訊息處理程式的訊息網關係結。
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
使用閘道傳送訊息。
class Demo { public void demo() { this.messagingGateway.send(message); } }
從 Azure 事件中樞接收訊息
填入認證組態選項。
建立訊息通道的 Bean 做為輸入通道。
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
使用
EventHubsInboundChannelAdapter
豆建立EventHubsMessageListenerContainer
,以接收來自事件中樞的訊息。@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
透過之前建立的訊息通道,使用 EventHubsInboundChannelAdapter 建立訊息接收者系結。
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
設定 EventHubsMessageConverter 以自定義 objectMapper
EventHubsMessageConverter
做為可設定的豆豆,讓使用者自定義 ObjectMapper。
Batch 取用者支援
若要在批次中取用來自事件中樞的訊息,與上述範例類似,除了用戶應該為 EventHubsInboundChannelAdapter
設定批次取用的相關組態選項。
建立 EventHubsInboundChannelAdapter
時,接聽程式模式應該設定為 BATCH
。 建立 EventHubsMessageListenerContainer
的 bean 時,請將檢查點模式設定為 MANUAL
或 BATCH
,並視需要設定批次選項。
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
事件中樞訊息標頭
下表說明事件中樞訊息屬性如何對應至 Spring 訊息標頭。 針對 Azure 事件中樞,訊息稱為 event
。
在記錄接聽程式模式中,事件中樞訊息/事件屬性與 Spring Message 標頭之間的對應:
事件中樞事件屬性 | Spring Message 標頭常數 | 類型 | 描述 |
---|---|---|---|
加入佇列的時間 | EventHubsHeaders#ENQUEUED_TIME | 瞬間 | 在事件中樞分割區中加入佇列事件的瞬間,以 UTC 為單位。 |
抵消 | EventHubsHeaders#OFFSET | 長 | 從相關聯的事件中樞分割區收到事件的位移。 |
數據分割索引鍵 | AzureHeaders#PARTITION_KEY | 字串 | 如果在最初發佈事件時設定分割區哈希索引鍵, |
數據分割標識碼 | AzureHeaders#RAW_PARTITION_ID | 字串 | 事件中樞的數據分割標識碼。 |
序號 | EventHubsHeaders#SEQUENCE_NUMBER | 長 | 在相關聯的事件中樞分割區中加入佇列時,指派給事件的序號。 |
最後加入佇列的事件屬性 | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | 此分割區中最後一個加入佇列事件的屬性。 |
那 | AzureHeaders#CHECKPOINTER | Checkpointer | 特定訊息檢查點的標頭。 |
使用者可以剖析訊息標頭,以取得每個事件的相關信息。 若要設定事件的訊息標頭,所有自定義標頭都會放置為事件的應用程式屬性,其中標頭會設定為屬性索引鍵。 從事件中樞接收事件時,所有應用程式屬性都會轉換成訊息標頭。
注意
不支援手動設定數據分割索引鍵、加入佇列的時間、位移和序號的訊息標頭。
啟用批次取用者模式時,會列出下列特定批次訊息標頭,其中包含每個單一事件中樞事件的值清單。
在批次接聽程式模式中,事件中樞訊息/事件屬性與 Spring Message 標頭之間的對應:
事件中樞事件屬性 | Spring Batch 訊息標頭常數 | 類型 | 描述 |
---|---|---|---|
加入佇列的時間 | EventHubsHeaders#ENQUEUED_TIME | 立即清單 | 在事件中樞分割區中加入佇列時,以 UTC 為單位的立即清單。 |
抵消 | EventHubsHeaders#OFFSET | Long 清單 | 從相關聯的事件中樞分割區收到每個事件的位移清單。 |
數據分割索引鍵 | AzureHeaders#PARTITION_KEY | 字串清單 | 如果在最初發佈每個事件時設定分割區哈希索引鍵的清單。 |
序號 | EventHubsHeaders#SEQUENCE_NUMBER | Long 清單 | 在相關聯的事件中樞分割區中加入佇列時,指派給每個事件的序號清單。 |
系統屬性 | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | 地圖清單 | 每個事件的系統屬性清單。 |
應用程式屬性 | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | 地圖清單 | 每個事件的應用程式屬性清單,其中會放置所有自定義訊息標頭或事件屬性。 |
注意
發佈訊息時,如果存在,則會從訊息中移除上述所有批次標頭。
樣品
如需詳細資訊,請參閱 GitHub 上的 azure-spring-boot-samples 存放庫
Spring 與 Azure 服務總線整合
重要概念
Spring Integration 可在 Spring 型應用程式中啟用輕量型傳訊,並支援透過宣告式配接器與外部系統整合。
Azure 服務總線延伸模組專案的 Spring Integration 提供 Azure 服務總線的輸入和輸出通道配接器。
注意
CompletableFuture 支援 API 已從 2.10.0 版淘汰,並由 4.0.0 版的 Reactor Core 取代。 如需詳細資訊,請參閱 Javadoc。
相依性設定
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
配置
此入門提供下列 2 個部分的組態選項:
聯機組態屬性
本節包含用來連線到 Azure 服務總線的組態選項。
注意
如果您選擇使用安全性主體向 Microsoft Entra ID 進行驗證和授權,以存取 Azure 資源,請參閱 使用 Microsoft Entra ID 授權存取,以確保安全性主體已獲得存取 Azure 資源的足夠許可權。
spring-cloud-azure-starter-integration-servicebus 的連線可設定屬性:
財產 | 類型 | 描述 |
---|---|---|
spring.cloud.azure.servicebus.enabled | 布爾 | 是否啟用 Azure 服務總線。 |
spring.cloud.azure.servicebus.connection-string | 字串 | 服務總線命名空間連接字串值。 |
spring.cloud.azure.servicebus.custom-endpoint-address | 字串 | 連接到服務總線時要使用的自訂端點位址。 |
spring.cloud.azure.servicebus.namespace | 字串 | 服務總線命名空間值,這是 FQDN 的前置詞。 FQDN 應該由 NamespaceName.DomainName 組成 |
spring.cloud.azure.servicebus.domain-name | 字串 | Azure 服務總線命名空間值的功能變數名稱。 |
服務總線處理器組態屬性
ServiceBusInboundChannelAdapter
會使用 ServiceBusProcessorClient
來取用訊息,來設定 ServiceBusProcessorClient
的整體屬性,開發人員可以使用 ServiceBusContainerProperties
來進行設定。 請參閱下一節 如何使用 ServiceBusInboundChannelAdapter
。
基本用法
將訊息傳送至 Azure 服務總線
填入認證組態選項。
針對認證作為連接字串,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
針對認證作為受控識別,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
注意
tenant-id
允許的值包括:common
、organizations
、consumers
或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者的 使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID。
針對認證即服務主體,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
注意
tenant-id
允許的值包括:common
、organizations
、consumers
或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者的 使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID。
使用
DefaultMessageHandler
豆建立ServiceBusTemplate
,以將訊息傳送至服務總線,並設定 ServiceBusTemplate 的實體類型。 此範例會採用服務總線佇列作為範例。class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
透過訊息通道建立具有上述訊息處理程式的訊息網關係結。
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
使用閘道傳送訊息。
class Demo { public void demo() { this.messagingGateway.send(message); } }
從 Azure 服務總線接收訊息
填入認證組態選項。
建立訊息通道的 Bean 做為輸入通道。
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
使用
ServiceBusInboundChannelAdapter
豆建立ServiceBusMessageListenerContainer
,以接收服務總線的訊息。 此範例會採用服務總線佇列作為範例。@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
透過我們先前建立的訊息通道,建立具有
ServiceBusInboundChannelAdapter
的訊息接收者系結。class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
設定 ServiceBusMessageConverter 以自定義 objectMapper
ServiceBusMessageConverter
會設定為可設定的豆子,讓使用者自定義 ObjectMapper
。
服務總線訊息標頭
對於一些可以對應至多個 Spring 標頭常數的服務總線標頭,會列出不同 Spring 標頭的優先順序。
服務總線標頭與 Spring 標頭之間的對應:
服務總線訊息標頭和屬性 | Spring message header 常數 | 類型 | 配置 | 描述 |
---|---|---|---|---|
內容類型 | MessageHeaders#CONTENT_TYPE |
字串 | 是的 | 訊息的RFC2045內容類型描述元。 |
相互關聯標識碼 | ServiceBusMessageHeaders#CORRELATION_ID |
字串 | 是的 | 訊息的相互關聯標識碼 |
訊息標識碼 | ServiceBusMessageHeaders#MESSAGE_ID |
字串 | 是的 | 訊息的訊息識別碼,此標頭的優先順序高於 MessageHeaders#ID 。 |
訊息標識碼 | MessageHeaders#ID |
UUID | 是的 | 訊息的訊息識別碼,此標頭的優先順序低於 ServiceBusMessageHeaders#MESSAGE_ID 。 |
數據分割索引鍵 | ServiceBusMessageHeaders#PARTITION_KEY |
字串 | 是的 | 將訊息傳送至數據分割實體的數據分割索引鍵。 |
回復 | MessageHeaders#REPLY_CHANNEL |
字串 | 是的 | 要傳送回復之實體的位址。 |
回復會話標識碼 | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
字串 | 是的 | 訊息的 ReplyToGroupId 屬性值。 |
排程加入佇列時間 utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | 是的 | 應該在服務總線中加入佇列訊息的日期時間,此標頭的優先順序高於 AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE 。 |
排程加入佇列時間 utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
整數 | 是的 | 應該在服務總線中加入佇列訊息的日期時間,此標頭的優先順序低於 ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME 。 |
會話標識碼 | ServiceBusMessageHeaders#SESSION_ID |
字串 | 是的 | 會話感知實體的會話 IDentifier。 |
存留時間 | ServiceBusMessageHeaders#TIME_TO_LIVE |
期間 | 是的 | 此訊息到期之前的持續時間。 |
自 | ServiceBusMessageHeaders#TO |
字串 | 是的 | 訊息的「收件者」位址,保留供未來在路由案例中使用,且目前由訊息代理程式本身忽略。 |
主題 | ServiceBusMessageHeaders#SUBJECT |
字串 | 是的 | 訊息的主旨。 |
寄不出的信件錯誤描述 | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
字串 | 不 | 已寄不出的訊息描述。 |
寄不出的信件原因 | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
字串 | 不 | 訊息寄不出的信件原因。 |
寄不出的信件來源 | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
字串 | 不 | 訊息已寄不出的實體。 |
傳遞計數 | ServiceBusMessageHeaders#DELIVERY_COUNT |
長 | 不 | 此訊息傳遞至客戶端的次數。 |
加入佇列的序號 | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
長 | 不 | 依服務總線指派給訊息的佇列序號。 |
加入佇列的時間 | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | 不 | 此訊息在服務總線中加入佇列的日期時間。 |
到期時間: | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | 不 | 此訊息到期的日期時間。 |
鎖定令牌 | ServiceBusMessageHeaders#LOCK_TOKEN |
字串 | 不 | 目前訊息的鎖定令牌。 |
鎖定直到 | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | 不 | 此訊息鎖定到期的日期時間。 |
序號 | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
長 | 不 | 服務總線指派給訊息的唯一號碼。 |
州 | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | 不 | 訊息的狀態,可以是 [作用中]、[延遲] 或 [已排程]。 |
數據分割索引鍵支援
此入門支援 服務總線分割,方法是允許在訊息標頭中設定分割區索引鍵和會話標識符。 本節介紹如何設定訊息的數據分割索引鍵。
建議:使用 ServiceBusMessageHeaders.PARTITION_KEY
作為標頭的索引鍵。
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
不建議,但目前支援:AzureHeaders.PARTITION_KEY
做為標頭的索引鍵。
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
注意
當訊息標頭中同時設定 ServiceBusMessageHeaders.PARTITION_KEY
和 AzureHeaders.PARTITION_KEY
時,建議使用 ServiceBusMessageHeaders.PARTITION_KEY
。
會話支援
此範例示範如何在應用程式中手動設定訊息的會話標識碼。
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
注意
當訊息標頭中設定 ServiceBusMessageHeaders.SESSION_ID
,而且也會設定不同的 ServiceBusMessageHeaders.PARTITION_KEY
標頭時,會話標識碼的值最終將用來覆寫分割區索引鍵的值。
自訂服務總線客戶端屬性
開發人員可以使用 AzureServiceClientBuilderCustomizer
來自定義服務總線用戶端屬性。 下列範例會自訂 sessionIdleTimeout
中的 ServiceBusClientBuilder
屬性:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
樣品
如需詳細資訊,請參閱 GitHub 上的 azure-spring-boot-samples 存放庫
Spring 與 Azure 記憶體佇列整合
重要概念
Azure 佇列記憶體是用來儲存大量訊息的服務。 您可以使用 HTTP 或 HTTPS 透過已驗證的呼叫,從世界各地存取訊息。 佇列訊息的大小最多可達 64 KB。 佇列可能包含數百萬則訊息,最多可達記憶體帳戶的總容量限制。 佇列通常用來建立待處理工作待辦專案,以異步處理。
相依性設定
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
配置
此入門提供下列組態選項:
聯機組態屬性
本節包含用來連線到 Azure 記憶體佇列的組態選項。
注意
如果您選擇使用安全性主體向 Microsoft Entra ID 進行驗證和授權,以存取 Azure 資源,請參閱 使用 Microsoft Entra ID 授權存取,以確保安全性主體已獲得存取 Azure 資源的足夠許可權。
spring-cloud-azure-starter-integration-storage-queue 的聯機可設定屬性:
財產 | 類型 | 描述 |
---|---|---|
spring.cloud.azure.storage.queue.enabled | 布爾 | 是否啟用 Azure 記憶體佇列。 |
spring.cloud.azure.storage.queue.connection-string | 字串 | 記憶體佇列命名空間連接字串值。 |
spring.cloud.azure.storage.queue.accountName | 字串 | 記憶體佇列帳戶名稱。 |
spring.cloud.azure.storage.queue.accountKey | 字串 | 記憶體仱列帳戶金鑰。 |
spring.cloud.azure.storage.queue.endpoint | 字串 | 記憶體佇列服務端點。 |
spring.cloud.azure.storage.queue.sasToken | 字串 | Sas 令牌認證 |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | 發出 API 要求時所使用的 QueueServiceVersion。 |
spring.cloud.azure.storage.queue.messageEncoding | 字串 | 佇列訊息編碼。 |
基本用法
將訊息傳送至 Azure 記憶體佇列
填入認證組態選項。
針對認證作為連接字串,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
針對認證作為受控識別,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
注意
tenant-id
允許的值包括:common
、organizations
、consumers
或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者的 使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID。
針對認證即服務主體,請在 application.yml 檔案中設定下列屬性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
注意
tenant-id
允許的值包括:common
、organizations
、consumers
或租用戶標識碼。 如需這些值的詳細資訊,請參閱 錯誤AADSTS50020 - 來自身分識別提供者的用戶帳戶不存在於租使用者的 使用錯誤的端點(個人和組織帳戶)一節。 如需轉換單一租使用者應用程式的資訊,請參閱 將單一租使用者應用程式轉換成多租使用者Microsoft Entra ID。
使用
DefaultMessageHandler
豆建立StorageQueueTemplate
,以將訊息傳送至記憶體佇列。class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
透過訊息通道,使用上述訊息處理程式建立訊息網關係結。
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
使用閘道傳送訊息。
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
從 Azure 記憶體佇列接收訊息
填入認證組態選項。
建立訊息通道的 Bean 做為輸入通道。
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
使用
StorageQueueMessageSource
豆建立StorageQueueTemplate
,以接收記憶體佇列的訊息。class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
透過我們先前建立的訊息通道,建立訊息接收者系結,並在最後一個步驟中建立 StorageQueueMessageSource。
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
樣品
如需詳細資訊,請參閱 GitHub 上的 azure-spring-boot-samples 存放庫