Spring Cloud Azure 对 Spring Integration 的支持
本文适用于:✅ 版本 4.19.0 ✅ 版本 5.21.0
适用于 Azure 的 Spring Integration 扩展为 Azure SDK for Java提供的各种服务的 Spring Integration 适配器。 我们为这些 Azure 服务提供 Spring Integration 支持:事件中心、服务总线、存储队列。 下面是支持的适配器列表:
-
spring-cloud-azure-starter-integration-eventhubs
- 有关详细信息,请参阅 Spring Integration 与 Azure 事件中心 -
spring-cloud-azure-starter-integration-servicebus
- 有关详细信息,请参阅 Spring Integration 与 Azure 服务总线 -
spring-cloud-azure-starter-integration-storage-queue
- 有关详细信息,请参阅 Spring Integration 与 Azure 存储队列
Spring 与 Azure 事件中心集成
关键概念
Azure 事件中心是一个大数据流式处理平台和事件引入服务。 它可以每秒接收和处理数百万个事件。 可以使用任何实时分析提供程序或批处理/存储适配器来转换和存储发送到事件中心的数据。
Spring Integration 在基于 Spring 的应用程序中启用轻型消息传送,并支持通过声明性适配器与外部系统集成。 这些适配器针对 Spring 对远程处理、消息传递和计划的支持提供了更高级别的抽象。 事件中心 扩展项目的
注意
RxJava 支持 API 从版本 4.0.0 中删除。 有关详细信息,请参阅 Javadoc。
使用者组
事件中心提供与 Apache Kafka 类似的使用者组支持,但逻辑略有不同。 当 Kafka 将所有已提交的偏移量存储在中转站中时,必须存储正在手动处理的事件中心消息的偏移量。 事件中心 SDK 提供用于在 Azure 存储中存储此类偏移量的函数。
分区支持
事件中心提供与 Kafka 类似的物理分区概念。 但是,与 Kafka 在使用者和分区之间自动重新平衡不同,事件中心提供了一种抢占模式。 存储帐户充当租约,以确定哪个分区由哪个使用者拥有。 当新的使用者启动时,它将尝试从大多数重负载的使用者中窃取某些分区,以实现工作负荷均衡。
若要指定负载均衡策略,开发人员可以使用 EventHubsContainerProperties
进行配置。 有关如何配置
Batch 使用者支持
EventHubsInboundChannelAdapter
支持批处理使用模式。 若要启用它,用户可以在构造 ListenerMode.BATCH
实例时将侦听器模式指定为 EventHubsInboundChannelAdapter
。
启用后,消息,其中有效负载是批处理事件列表,并传递到下游通道。 每个消息标头也会转换为列表,其中内容是从每个事件分析的关联标头值。 对于分区 ID、检查点器和最后一个排队属性的公共标头,它们显示为整个事件批次的单个值共享同一个值。 有关详细信息,请参阅 事件中心消息标头 部分。
注意
仅当使用 manual 检查点模式时,才存在检查点标头
批处理使用者的检查点支持两种模式:BATCH
和 MANUAL
。
BATCH
模式是一种自动检查点模式,用于在收到事件后将整批事件一起检查点。
MANUAL
模式是检查用户的事件。 使用时,检查点器 将传递到消息标头中,用户可以使用它执行检查点。
批处理使用策略可由 max-size
和 max-wait-time
的属性指定,其中 max-size
是可选 max-wait-time
时的必要属性。
若要指定批处理使用策略,开发人员可以使用 EventHubsContainerProperties
进行配置。 有关如何配置
依赖项设置
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
配置
此初学者提供以下 3 部分的配置选项:
连接配置属性
本部分包含用于连接到 Azure 事件中心的配置选项。
注意
如果选择使用安全主体对访问 Azure 资源的 Microsoft Entra ID 进行身份验证和授权,请参阅 使用 Microsoft Entra ID 授权访问,以确保安全主体已获得访问 Azure 资源的足够权限。
spring-cloud-azure-starter-integration-eventhubs 的连接可配置属性:
财产 | 类型 | 描述 |
---|---|---|
spring.cloud.azure.eventhubs.enabled | 布尔 | 是否启用 Azure 事件中心。 |
spring.cloud.azure.eventhubs.connection-string | 字符串 | 事件中心命名空间连接字符串值。 |
spring.cloud.azure.eventhubs.namespace | 字符串 | 事件中心命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成 |
spring.cloud.azure.eventhubs.domain-name | 字符串 | Azure 事件中心命名空间值的域名。 |
spring.cloud.azure.eventhubs.custom-endpoint-address | 字符串 | 自定义终结点地址。 |
spring.cloud.azure.eventhubs.shared-connection | 布尔 | 基础 EventProcessorClient 和 EventHubProducerAsyncClient 是否使用相同的连接。 默认情况下,为创建的每个事件中心客户端构造并使用新连接。 |
检查点配置属性
本部分包含存储 Blob 服务的配置选项,用于保存分区所有权和检查点信息。
注意
从版本 4.0.0 开始,如果未手动启用 spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 的属性,则不会自动创建任何存储容器。
检查点 spring-cloud-azure-starter-integration-eventhubs 的可配置属性:
财产 | 类型 | 描述 |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | 布尔 | 是否允许创建容器(如果不存在)。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | 字符串 | 存储帐户的名称。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | 字符串 | 存储帐户访问密钥。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | 字符串 | 存储容器名称。 |
常见的 Azure 服务 SDK 配置选项也可用于存储 Blob 检查点存储。
Spring Cloud Azure 配置中引入了受支持的配置选项,可以使用统一前缀 spring.cloud.azure.
或 spring.cloud.azure.eventhubs.processor.checkpoint-store
前缀进行配置。
事件中心处理器配置属性
EventHubsInboundChannelAdapter
使用 EventProcessorClient
从事件中心使用消息,以配置 EventProcessorClient
的整体属性,开发人员可以使用 EventHubsContainerProperties
进行配置。 请参阅以下 了解如何使用 EventHubsInboundChannelAdapter
。
基本用法
将消息发送到 Azure 事件中心
填写凭据配置选项。
对于凭据作为连接字符串,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
注意
Microsoft 建议使用最安全的可用身份验证流。 本过程中介绍的身份验证流程(例如数据库、缓存、消息传送或 AI 服务)需要非常高的信任度,并携带其他流中不存在的风险。 仅当更安全的选项(例如无密码连接或无密钥连接的托管标识)不可行时,才使用此流。 对于本地计算机操作,首选无密码连接或无密钥连接的用户标识。
对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
对于作为服务主体的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
注意
允许 tenant-id
的值包括:common
、organizations
、consumers
或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。
使用
DefaultMessageHandler
bean 创建EventHubsTemplate
,以将消息发送到事件中心。class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
通过消息通道使用上述消息处理程序创建消息网关绑定。
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
使用网关发送消息。
class Demo { public void demo() { this.messagingGateway.send(message); } }
从 Azure 事件中心接收消息
填写凭据配置选项。
创建消息通道作为输入通道的 bean。
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
使用
EventHubsInboundChannelAdapter
bean 创建EventHubsMessageListenerContainer
,以从事件中心接收消息。@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
通过之前创建的消息通道,使用 EventHubsInboundChannelAdapter 创建消息接收器绑定。
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
配置 EventHubsMessageConverter 以自定义 objectMapper
EventHubsMessageConverter
是可配置的豆子,允许用户自定义 ObjectMapper。
Batch 使用者支持
若要批量使用来自事件中心的消息与上述示例类似,用户还应为 EventHubsInboundChannelAdapter
设置批处理使用的相关配置选项。
创建 EventHubsInboundChannelAdapter
时,侦听器模式应设置为 BATCH
。 创建 EventHubsMessageListenerContainer
bean 时,请将检查点模式设置为 MANUAL
或 BATCH
,并且可以根据需要配置批处理选项。
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
事件中心消息标头
下表说明了事件中心消息属性如何映射到 Spring 消息标头。 对于 Azure 事件中心,消息称为 event
。
在记录侦听器模式下的事件中心消息/事件属性和 Spring Message 标头之间映射:
事件中心事件属性 | Spring Message 标头常量 | 类型 | 描述 |
---|---|---|---|
排队时间 | EventHubsHeaders#ENQUEUED_TIME | 瞬间 | 事件在事件中心分区中排队时的即时(UTC)。 |
抵消 | EventHubsHeaders#OFFSET | 长 | 从关联的事件中心分区接收事件的偏移量。 |
分区键 | AzureHeaders#PARTITION_KEY | 字符串 | 如果最初发布事件时设置了分区哈希键,则为分区哈希键。 |
分区 ID | AzureHeaders#RAW_PARTITION_ID | 字符串 | 事件中心的分区 ID。 |
序列号 | EventHubsHeaders#SEQUENCE_NUMBER | 长 | 在关联事件中心分区中排队时分配给事件的序列号。 |
最后排队事件属性 | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | 此分区中最后一个排队事件的属性。 |
那 | AzureHeaders#CHECKPOINTER | 检查点器 | 特定消息的检查点的标头。 |
用户可以分析每个事件相关信息的消息标头。 若要为事件设置消息标头,所有自定义标头都将作为事件的应用程序属性放置,其中标头设置为属性键。 从事件中心接收事件时,所有应用程序属性都将转换为消息标头。
注意
不支持手动设置分区键、排队时间、偏移量和序列号的消息标头。
启用批处理使用者模式后,将列出批处理消息的特定标头,其中包含每个事件中心事件中的值列表。
在批处理侦听器模式下,事件中心消息/事件属性和 Spring Message 标头之间的映射:
事件中心事件属性 | Spring Batch 消息标头常量 | 类型 | 描述 |
---|---|---|---|
排队时间 | EventHubsHeaders#ENQUEUED_TIME | 即时列表 | 事件中心分区中排队的每个事件的即时列表(UTC)。 |
抵消 | EventHubsHeaders#OFFSET | Long 列表 | 从关联的事件中心分区接收每个事件的偏移量列表。 |
分区键 | AzureHeaders#PARTITION_KEY | 字符串列表 | 如果在最初发布每个事件时设置分区哈希键的列表。 |
序列号 | EventHubsHeaders#SEQUENCE_NUMBER | Long 列表 | 在关联事件中心分区中排队时分配给每个事件的序列号的列表。 |
系统属性 | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | 地图列表 | 每个事件的系统属性列表。 |
应用程序属性 | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | 地图列表 | 每个事件的应用程序属性的列表,其中放置了所有自定义的消息标头或事件属性。 |
注意
发布消息时,上述所有批处理标头都将从消息中删除(如果存在)。
样品
有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库
Spring 与 Azure 服务总线集成
关键概念
Spring Integration 在基于 Spring 的应用程序中启用轻型消息传送,并支持通过声明性适配器与外部系统集成。
Azure 服务总线扩展项目的 Spring Integration 为 Azure 服务总线提供入站和出站通道适配器。
注意
CompletableFuture 支持 API 已从版本 2.10.0 弃用,由 4.0.0 版中的 Reactor Core 取代。 有关详细信息,请参阅 Javadoc。
依赖项设置
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
配置
此初学者提供以下 2 部分的配置选项:
连接配置属性
本部分包含用于连接到 Azure 服务总线的配置选项。
注意
如果选择使用安全主体对访问 Azure 资源的 Microsoft Entra ID 进行身份验证和授权,请参阅 使用 Microsoft Entra ID 授权访问,以确保安全主体已获得访问 Azure 资源的足够权限。
spring-cloud-azure-starter-integration-servicebus 的连接可配置属性:
财产 | 类型 | 描述 |
---|---|---|
spring.cloud.azure.servicebus.enabled | 布尔 | 是否启用 Azure 服务总线。 |
spring.cloud.azure.servicebus.connection-string | 字符串 | 服务总线命名空间连接字符串值。 |
spring.cloud.azure.servicebus.custom-endpoint-address | 字符串 | 连接到服务总线时要使用的自定义终结点地址。 |
spring.cloud.azure.servicebus.namespace | 字符串 | 服务总线命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成 |
spring.cloud.azure.servicebus.domain-name | 字符串 | Azure 服务总线命名空间值的域名。 |
服务总线处理器配置属性
ServiceBusInboundChannelAdapter
使用 ServiceBusProcessorClient
来使用消息,配置 ServiceBusProcessorClient
的整体属性,开发人员可以使用 ServiceBusContainerProperties
进行配置。 请参阅以下 了解如何使用 ServiceBusInboundChannelAdapter
。
基本用法
将消息发送到 Azure 服务总线
填写凭据配置选项。
对于凭据作为连接字符串,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
注意
Microsoft 建议使用最安全的可用身份验证流。 本过程中介绍的身份验证流程(例如数据库、缓存、消息传送或 AI 服务)需要非常高的信任度,并携带其他流中不存在的风险。 仅当更安全的选项(例如无密码连接或无密钥连接的托管标识)不可行时,才使用此流。 对于本地计算机操作,首选无密码连接或无密钥连接的用户标识。
对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
注意
允许 tenant-id
的值包括:common
、organizations
、consumers
或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。
对于作为服务主体的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
注意
允许 tenant-id
的值包括:common
、organizations
、consumers
或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。
使用
DefaultMessageHandler
bean 创建ServiceBusTemplate
以将消息发送到服务总线,并设置 ServiceBusTemplate 的实体类型。 此示例以服务总线队列为例。class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
通过消息通道使用上述消息处理程序创建消息网关绑定。
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
使用网关发送消息。
class Demo { public void demo() { this.messagingGateway.send(message); } }
从 Azure 服务总线接收消息
填写凭据配置选项。
创建消息通道作为输入通道的 bean。
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
使用
ServiceBusInboundChannelAdapter
bean 创建ServiceBusMessageListenerContainer
,以接收服务总线的消息。 此示例以服务总线队列为例。@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
通过之前创建的消息通道创建消息接收器绑定,
ServiceBusInboundChannelAdapter
。class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
配置 ServiceBusMessageConverter 以自定义 objectMapper
ServiceBusMessageConverter
作为可配置豆,允许用户自定义 ObjectMapper
。
服务总线消息标头
对于可映射到多个 Spring 标头常量的某些服务总线标头,列出了不同 Spring 标头的优先级。
服务总线标头与 Spring 标头之间的映射:
服务总线消息标头和属性 | Spring message 标头常量 | 类型 | 配置 | 描述 |
---|---|---|---|---|
内容类型 | MessageHeaders#CONTENT_TYPE |
字符串 | 是的 | 消息的RFC2045内容类型描述符。 |
相关 ID | ServiceBusMessageHeaders#CORRELATION_ID |
字符串 | 是的 | 消息的相关 ID |
消息 ID | ServiceBusMessageHeaders#MESSAGE_ID |
字符串 | 是的 | 消息的消息 ID,此标头的优先级高于 MessageHeaders#ID 。 |
消息 ID | MessageHeaders#ID |
UUID | 是的 | 消息的消息 ID,此标头的优先级低于 ServiceBusMessageHeaders#MESSAGE_ID 。 |
分区键 | ServiceBusMessageHeaders#PARTITION_KEY |
字符串 | 是的 | 用于将消息发送到分区实体的分区键。 |
回复 | MessageHeaders#REPLY_CHANNEL |
字符串 | 是的 | 要向其发送答复的实体的地址。 |
回复会话 ID | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
字符串 | 是的 | 消息的 ReplyToGroupId 属性值。 |
计划的排队时间 utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | 是的 | 应在服务总线中排队消息的日期/时间,此标头的优先级高于 AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE 。 |
计划的排队时间 utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
整数 | 是的 | 应在服务总线中排队消息的日期/时间,此标头的优先级低于 ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME 。 |
会话 ID | ServiceBusMessageHeaders#SESSION_ID |
字符串 | 是的 | 会话感知实体的会话 IDentifier。 |
生存时间 | ServiceBusMessageHeaders#TIME_TO_LIVE |
期间 | 是的 | 此消息过期之前的持续时间。 |
自 | ServiceBusMessageHeaders#TO |
字符串 | 是的 | 消息的“收件人”地址,保留供将来在路由方案中使用,目前被中转站本身忽略。 |
主题 | ServiceBusMessageHeaders#SUBJECT |
字符串 | 是的 | 邮件的主题。 |
死信错误说明 | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
字符串 | 不 | 已死信的消息的说明。 |
死信原因 | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
字符串 | 不 | 消息死信的原因。 |
死信源 | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
字符串 | 不 | 消息为死信的实体。 |
传递计数 | ServiceBusMessageHeaders#DELIVERY_COUNT |
长 | 不 | 此消息传递到客户端的次数。 |
排队序列号 | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
长 | 不 | 按服务总线分配给消息的排队序列号。 |
排队时间 | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | 不 | 此消息在服务总线中排队的日期时间。 |
过期时间 | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | 不 | 此消息到期的日期/时间。 |
锁定令牌 | ServiceBusMessageHeaders#LOCK_TOKEN |
字符串 | 不 | 当前消息的锁定令牌。 |
锁定到 | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | 不 | 此消息的锁定过期的日期/时间。 |
序列号 | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
长 | 不 | 服务总线分配给消息的唯一编号。 |
州 | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | 不 | 消息的状态,可以是“活动”、“延迟”或“计划”。 |
分区键支持
此初学者通过允许在消息标头中设置分区键和会话 ID,支持 服务总线分区。 本部分介绍如何设置消息的分区键。
建议:使用 ServiceBusMessageHeaders.PARTITION_KEY
作为标头的键。
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
不建议,但目前支持:AzureHeaders.PARTITION_KEY
作为标头的键。
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
注意
当消息标头中同时设置 ServiceBusMessageHeaders.PARTITION_KEY
和 AzureHeaders.PARTITION_KEY
时,首选 ServiceBusMessageHeaders.PARTITION_KEY
。
会话支持
此示例演示如何在应用程序中手动设置消息的会话 ID。
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
注意
在消息标头中设置 ServiceBusMessageHeaders.SESSION_ID
并且还设置了不同的 ServiceBusMessageHeaders.PARTITION_KEY
标头时,会话 ID 的值最终将用于覆盖分区键的值。
自定义服务总线客户端属性
开发人员可以使用 AzureServiceClientBuilderCustomizer
来自定义服务总线客户端属性。 以下示例自定义 sessionIdleTimeout
中的 ServiceBusClientBuilder
属性:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
样品
有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库
Spring 与 Azure 存储队列集成
关键概念
Azure 队列存储是用于存储大量消息的服务。 使用 HTTP 或 HTTPS 通过经过身份验证的调用从世界任何地方访问消息。 队列消息的大小最多可为 64 KB。 队列可能包含数百万条消息,最大容量限制为存储帐户的总容量限制。 队列通常用于创建以异步方式处理的积压工作。
依赖项设置
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
配置
此初学者提供以下配置选项:
连接配置属性
本部分包含用于连接到 Azure 存储队列的配置选项。
注意
如果选择使用安全主体对访问 Azure 资源的 Microsoft Entra ID 进行身份验证和授权,请参阅 使用 Microsoft Entra ID 授权访问,以确保安全主体已获得访问 Azure 资源的足够权限。
spring-cloud-azure-starter-integration-storage-queue 的连接可配置属性:
财产 | 类型 | 描述 |
---|---|---|
spring.cloud.azure.storage.queue.enabled | 布尔 | 是否启用了 Azure 存储队列。 |
spring.cloud.azure.storage.queue.connection-string | 字符串 | 存储队列命名空间连接字符串值。 |
spring.cloud.azure.storage.queue.accountName | 字符串 | 存储帐户名称。 |
spring.cloud.azure.storage.queue.accountKey | 字符串 | 存储帐户密钥。 |
spring.cloud.azure.storage.queue.endpoint | 字符串 | 存储队列服务终结点。 |
spring.cloud.azure.storage.queue.sasToken | 字符串 | Sas 令牌凭据 |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | 发出 API 请求时使用的 QueueServiceVersion。 |
spring.cloud.azure.storage.queue.messageEncoding | 字符串 | 队列消息编码。 |
基本用法
将消息发送到 Azure 存储队列
填写凭据配置选项。
对于凭据作为连接字符串,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
注意
Microsoft 建议使用最安全的可用身份验证流。 本过程中介绍的身份验证流程(例如数据库、缓存、消息传送或 AI 服务)需要非常高的信任度,并携带其他流中不存在的风险。 仅当更安全的选项(例如无密码连接或无密钥连接的托管标识)不可行时,才使用此流。 对于本地计算机操作,首选无密码连接或无密钥连接的用户标识。
对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
注意
允许 tenant-id
的值包括:common
、organizations
、consumers
或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。
对于作为服务主体的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
注意
允许 tenant-id
的值包括:common
、organizations
、consumers
或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。
使用
DefaultMessageHandler
bean 创建StorageQueueTemplate
,以将消息发送到存储队列。class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
通过消息通道创建包含上述消息处理程序的消息网关绑定。
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
使用网关发送消息。
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
从 Azure 存储队列接收消息
填写凭据配置选项。
创建消息通道作为输入通道的 bean。
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
使用
StorageQueueMessageSource
bean 创建StorageQueueTemplate
,以接收到存储队列的消息。class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
使用在上一步中创建的 StorageQueueMessageSource 通过之前创建的消息通道创建消息接收器绑定。
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
样品
有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库