Spring Cloud Azure 对 Spring Integration 的支持

本文适用于:✅ 版本 4.19.0 ✅ 版本 5.21.0

适用于 Azure 的 Spring Integration 扩展为 Azure SDK for Java提供的各种服务的 Spring Integration 适配器。 我们为这些 Azure 服务提供 Spring Integration 支持:事件中心、服务总线、存储队列。 下面是支持的适配器列表:

Spring 与 Azure 事件中心集成

关键概念

Azure 事件中心是一个大数据流式处理平台和事件引入服务。 它可以每秒接收和处理数百万个事件。 可以使用任何实时分析提供程序或批处理/存储适配器来转换和存储发送到事件中心的数据。

Spring Integration 在基于 Spring 的应用程序中启用轻型消息传送,并支持通过声明性适配器与外部系统集成。 这些适配器针对 Spring 对远程处理、消息传递和计划的支持提供了更高级别的抽象。 事件中心 扩展项目的 Spring Integration 为 Azure 事件中心提供入站和出站通道适配器和网关。

注意

RxJava 支持 API 从版本 4.0.0 中删除。 有关详细信息,请参阅 Javadoc。

使用者组

事件中心提供与 Apache Kafka 类似的使用者组支持,但逻辑略有不同。 当 Kafka 将所有已提交的偏移量存储在中转站中时,必须存储正在手动处理的事件中心消息的偏移量。 事件中心 SDK 提供用于在 Azure 存储中存储此类偏移量的函数。

分区支持

事件中心提供与 Kafka 类似的物理分区概念。 但是,与 Kafka 在使用者和分区之间自动重新平衡不同,事件中心提供了一种抢占模式。 存储帐户充当租约,以确定哪个分区由哪个使用者拥有。 当新的使用者启动时,它将尝试从大多数重负载的使用者中窃取某些分区,以实现工作负荷均衡。

若要指定负载均衡策略,开发人员可以使用 EventHubsContainerProperties 进行配置。 有关如何配置 的示例,请参阅以下部分

Batch 使用者支持

EventHubsInboundChannelAdapter 支持批处理使用模式。 若要启用它,用户可以在构造 ListenerMode.BATCH 实例时将侦听器模式指定为 EventHubsInboundChannelAdapter。 启用后,消息,其中有效负载是批处理事件列表,并传递到下游通道。 每个消息标头也会转换为列表,其中内容是从每个事件分析的关联标头值。 对于分区 ID、检查点器和最后一个排队属性的公共标头,它们显示为整个事件批次的单个值共享同一个值。 有关详细信息,请参阅 事件中心消息标头 部分。

注意

仅当使用 manual 检查点模式时,才存在检查点标头

批处理使用者的检查点支持两种模式:BATCHMANUALBATCH 模式是一种自动检查点模式,用于在收到事件后将整批事件一起检查点。 MANUAL 模式是检查用户的事件。 使用时,检查点器 将传递到消息标头中,用户可以使用它执行检查点。

批处理使用策略可由 max-sizemax-wait-time的属性指定,其中 max-size 是可选 max-wait-time 时的必要属性。 若要指定批处理使用策略,开发人员可以使用 EventHubsContainerProperties 进行配置。 有关如何配置 的示例,请参阅以下部分

依赖项设置

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

配置

此初学者提供以下 3 部分的配置选项:

连接配置属性

本部分包含用于连接到 Azure 事件中心的配置选项。

注意

如果选择使用安全主体对访问 Azure 资源的 Microsoft Entra ID 进行身份验证和授权,请参阅 使用 Microsoft Entra ID 授权访问,以确保安全主体已获得访问 Azure 资源的足够权限。

spring-cloud-azure-starter-integration-eventhubs 的连接可配置属性:

财产 类型 描述
spring.cloud.azure.eventhubs.enabled 布尔 是否启用 Azure 事件中心。
spring.cloud.azure.eventhubs.connection-string 字符串 事件中心命名空间连接字符串值。
spring.cloud.azure.eventhubs.namespace 字符串 事件中心命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成
spring.cloud.azure.eventhubs.domain-name 字符串 Azure 事件中心命名空间值的域名。
spring.cloud.azure.eventhubs.custom-endpoint-address 字符串 自定义终结点地址。
spring.cloud.azure.eventhubs.shared-connection 布尔 基础 EventProcessorClient 和 EventHubProducerAsyncClient 是否使用相同的连接。 默认情况下,为创建的每个事件中心客户端构造并使用新连接。

检查点配置属性

本部分包含存储 Blob 服务的配置选项,用于保存分区所有权和检查点信息。

注意

从版本 4.0.0 开始,如果未手动启用 spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 的属性,则不会自动创建任何存储容器。

检查点 spring-cloud-azure-starter-integration-eventhubs 的可配置属性:

财产 类型 描述
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 布尔 是否允许创建容器(如果不存在)。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name 字符串 存储帐户的名称。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key 字符串 存储帐户访问密钥。
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name 字符串 存储容器名称。

常见的 Azure 服务 SDK 配置选项也可用于存储 Blob 检查点存储。 Spring Cloud Azure 配置中引入了受支持的配置选项,可以使用统一前缀 spring.cloud.azure.spring.cloud.azure.eventhubs.processor.checkpoint-store前缀进行配置。

事件中心处理器配置属性

EventHubsInboundChannelAdapter 使用 EventProcessorClient 从事件中心使用消息,以配置 EventProcessorClient的整体属性,开发人员可以使用 EventHubsContainerProperties 进行配置。 请参阅以下 了解如何使用 EventHubsInboundChannelAdapter

基本用法

将消息发送到 Azure 事件中心

  1. 填写凭据配置选项。

    • 对于凭据作为连接字符串,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      

      注意

      Microsoft 建议使用最安全的可用身份验证流。 本过程中介绍的身份验证流程(例如数据库、缓存、消息传送或 AI 服务)需要非常高的信任度,并携带其他流中不存在的风险。 仅当更安全的选项(例如无密码连接或无密钥连接的托管标识)不可行时,才使用此流。 对于本地计算机操作,首选无密码连接或无密钥连接的用户标识。

    • 对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • 对于作为服务主体的凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

注意

允许 tenant-id 的值包括:commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。

  1. 使用 DefaultMessageHandler bean 创建 EventHubsTemplate,以将消息发送到事件中心。

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. 通过消息通道使用上述消息处理程序创建消息网关绑定。

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. 使用网关发送消息。

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

从 Azure 事件中心接收消息

  1. 填写凭据配置选项。

  2. 创建消息通道作为输入通道的 bean。

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. 使用 EventHubsInboundChannelAdapter bean 创建 EventHubsMessageListenerContainer,以从事件中心接收消息。

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. 通过之前创建的消息通道,使用 EventHubsInboundChannelAdapter 创建消息接收器绑定。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

配置 EventHubsMessageConverter 以自定义 objectMapper

EventHubsMessageConverter 是可配置的豆子,允许用户自定义 ObjectMapper。

Batch 使用者支持

若要批量使用来自事件中心的消息与上述示例类似,用户还应为 EventHubsInboundChannelAdapter设置批处理使用的相关配置选项。

创建 EventHubsInboundChannelAdapter时,侦听器模式应设置为 BATCH。 创建 EventHubsMessageListenerContainerbean 时,请将检查点模式设置为 MANUALBATCH,并且可以根据需要配置批处理选项。

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

事件中心消息标头

下表说明了事件中心消息属性如何映射到 Spring 消息标头。 对于 Azure 事件中心,消息称为 event

在记录侦听器模式下的事件中心消息/事件属性和 Spring Message 标头之间映射:

事件中心事件属性 Spring Message 标头常量 类型 描述
排队时间 EventHubsHeaders#ENQUEUED_TIME 瞬间 事件在事件中心分区中排队时的即时(UTC)。
抵消 EventHubsHeaders#OFFSET 从关联的事件中心分区接收事件的偏移量。
分区键 AzureHeaders#PARTITION_KEY 字符串 如果最初发布事件时设置了分区哈希键,则为分区哈希键。
分区 ID AzureHeaders#RAW_PARTITION_ID 字符串 事件中心的分区 ID。
序列号 EventHubsHeaders#SEQUENCE_NUMBER 在关联事件中心分区中排队时分配给事件的序列号。
最后排队事件属性 EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties 此分区中最后一个排队事件的属性。
AzureHeaders#CHECKPOINTER 检查点器 特定消息的检查点的标头。

用户可以分析每个事件相关信息的消息标头。 若要为事件设置消息标头,所有自定义标头都将作为事件的应用程序属性放置,其中标头设置为属性键。 从事件中心接收事件时,所有应用程序属性都将转换为消息标头。

注意

不支持手动设置分区键、排队时间、偏移量和序列号的消息标头。

启用批处理使用者模式后,将列出批处理消息的特定标头,其中包含每个事件中心事件中的值列表。

在批处理侦听器模式下,事件中心消息/事件属性和 Spring Message 标头之间的映射:

事件中心事件属性 Spring Batch 消息标头常量 类型 描述
排队时间 EventHubsHeaders#ENQUEUED_TIME 即时列表 事件中心分区中排队的每个事件的即时列表(UTC)。
抵消 EventHubsHeaders#OFFSET Long 列表 从关联的事件中心分区接收每个事件的偏移量列表。
分区键 AzureHeaders#PARTITION_KEY 字符串列表 如果在最初发布每个事件时设置分区哈希键的列表。
序列号 EventHubsHeaders#SEQUENCE_NUMBER Long 列表 在关联事件中心分区中排队时分配给每个事件的序列号的列表。
系统属性 EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES 地图列表 每个事件的系统属性列表。
应用程序属性 EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES 地图列表 每个事件的应用程序属性的列表,其中放置了所有自定义的消息标头或事件属性。

注意

发布消息时,上述所有批处理标头都将从消息中删除(如果存在)。

样品

有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库

Spring 与 Azure 服务总线集成

关键概念

Spring Integration 在基于 Spring 的应用程序中启用轻型消息传送,并支持通过声明性适配器与外部系统集成。

Azure 服务总线扩展项目的 Spring Integration 为 Azure 服务总线提供入站和出站通道适配器。

注意

CompletableFuture 支持 API 已从版本 2.10.0 弃用,由 4.0.0 版中的 Reactor Core 取代。 有关详细信息,请参阅 Javadoc。

依赖项设置

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

配置

此初学者提供以下 2 部分的配置选项:

连接配置属性

本部分包含用于连接到 Azure 服务总线的配置选项。

注意

如果选择使用安全主体对访问 Azure 资源的 Microsoft Entra ID 进行身份验证和授权,请参阅 使用 Microsoft Entra ID 授权访问,以确保安全主体已获得访问 Azure 资源的足够权限。

spring-cloud-azure-starter-integration-servicebus 的连接可配置属性:

财产 类型 描述
spring.cloud.azure.servicebus.enabled 布尔 是否启用 Azure 服务总线。
spring.cloud.azure.servicebus.connection-string 字符串 服务总线命名空间连接字符串值。
spring.cloud.azure.servicebus.custom-endpoint-address 字符串 连接到服务总线时要使用的自定义终结点地址。
spring.cloud.azure.servicebus.namespace 字符串 服务总线命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成
spring.cloud.azure.servicebus.domain-name 字符串 Azure 服务总线命名空间值的域名。

服务总线处理器配置属性

ServiceBusInboundChannelAdapter 使用 ServiceBusProcessorClient 来使用消息,配置 ServiceBusProcessorClient的整体属性,开发人员可以使用 ServiceBusContainerProperties 进行配置。 请参阅以下 了解如何使用 ServiceBusInboundChannelAdapter

基本用法

将消息发送到 Azure 服务总线

  1. 填写凭据配置选项。

    • 对于凭据作为连接字符串,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      

      注意

      Microsoft 建议使用最安全的可用身份验证流。 本过程中介绍的身份验证流程(例如数据库、缓存、消息传送或 AI 服务)需要非常高的信任度,并携带其他流中不存在的风险。 仅当更安全的选项(例如无密码连接或无密钥连接的托管标识)不可行时,才使用此流。 对于本地计算机操作,首选无密码连接或无密钥连接的用户标识。

    • 对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

注意

允许 tenant-id 的值包括:commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。

  • 对于作为服务主体的凭据,请在 application.yml 文件中配置以下属性:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

注意

允许 tenant-id 的值包括:commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。

  1. 使用 DefaultMessageHandler bean 创建 ServiceBusTemplate 以将消息发送到服务总线,并设置 ServiceBusTemplate 的实体类型。 此示例以服务总线队列为例。

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. 通过消息通道使用上述消息处理程序创建消息网关绑定。

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. 使用网关发送消息。

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

从 Azure 服务总线接收消息

  1. 填写凭据配置选项。

  2. 创建消息通道作为输入通道的 bean。

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. 使用 ServiceBusInboundChannelAdapter bean 创建 ServiceBusMessageListenerContainer,以接收服务总线的消息。 此示例以服务总线队列为例。

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. 通过之前创建的消息通道创建消息接收器绑定,ServiceBusInboundChannelAdapter

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

配置 ServiceBusMessageConverter 以自定义 objectMapper

ServiceBusMessageConverter 作为可配置豆,允许用户自定义 ObjectMapper

服务总线消息标头

对于可映射到多个 Spring 标头常量的某些服务总线标头,列出了不同 Spring 标头的优先级。

服务总线标头与 Spring 标头之间的映射:

服务总线消息标头和属性 Spring message 标头常量 类型 配置 描述
内容类型 MessageHeaders#CONTENT_TYPE 字符串 是的 消息的RFC2045内容类型描述符。
相关 ID ServiceBusMessageHeaders#CORRELATION_ID 字符串 是的 消息的相关 ID
消息 ID ServiceBusMessageHeaders#MESSAGE_ID 字符串 是的 消息的消息 ID,此标头的优先级高于 MessageHeaders#ID
消息 ID MessageHeaders#ID UUID 是的 消息的消息 ID,此标头的优先级低于 ServiceBusMessageHeaders#MESSAGE_ID
分区键 ServiceBusMessageHeaders#PARTITION_KEY 字符串 是的 用于将消息发送到分区实体的分区键。
回复 MessageHeaders#REPLY_CHANNEL 字符串 是的 要向其发送答复的实体的地址。
回复会话 ID ServiceBusMessageHeaders#REPLY_TO_SESSION_ID 字符串 是的 消息的 ReplyToGroupId 属性值。
计划的排队时间 utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime 是的 应在服务总线中排队消息的日期/时间,此标头的优先级高于 AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE
计划的排队时间 utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE 整数 是的 应在服务总线中排队消息的日期/时间,此标头的优先级低于 ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME
会话 ID ServiceBusMessageHeaders#SESSION_ID 字符串 是的 会话感知实体的会话 IDentifier。
生存时间 ServiceBusMessageHeaders#TIME_TO_LIVE 期间 是的 此消息过期之前的持续时间。
ServiceBusMessageHeaders#TO 字符串 是的 消息的“收件人”地址,保留供将来在路由方案中使用,目前被中转站本身忽略。
主题 ServiceBusMessageHeaders#SUBJECT 字符串 是的 邮件的主题。
死信错误说明 ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION 字符串 已死信的消息的说明。
死信原因 ServiceBusMessageHeaders#DEAD_LETTER_REASON 字符串 消息死信的原因。
死信源 ServiceBusMessageHeaders#DEAD_LETTER_SOURCE 字符串 消息为死信的实体。
传递计数 ServiceBusMessageHeaders#DELIVERY_COUNT 此消息传递到客户端的次数。
排队序列号 ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER 按服务总线分配给消息的排队序列号。
排队时间 ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime 此消息在服务总线中排队的日期时间。
过期时间 ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime 此消息到期的日期/时间。
锁定令牌 ServiceBusMessageHeaders#LOCK_TOKEN 字符串 当前消息的锁定令牌。
锁定到 ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime 此消息的锁定过期的日期/时间。
序列号 ServiceBusMessageHeaders#SEQUENCE_NUMBER 服务总线分配给消息的唯一编号。
ServiceBusMessageHeaders#STATE ServiceBusMessageState 消息的状态,可以是“活动”、“延迟”或“计划”。

分区键支持

此初学者通过允许在消息标头中设置分区键和会话 ID,支持 服务总线分区。 本部分介绍如何设置消息的分区键。

建议:使用 ServiceBusMessageHeaders.PARTITION_KEY 作为标头的键。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

不建议,但目前支持:AzureHeaders.PARTITION_KEY 作为标头的键。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

注意

当消息标头中同时设置 ServiceBusMessageHeaders.PARTITION_KEYAzureHeaders.PARTITION_KEY 时,首选 ServiceBusMessageHeaders.PARTITION_KEY

会话支持

此示例演示如何在应用程序中手动设置消息的会话 ID。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

注意

在消息标头中设置 ServiceBusMessageHeaders.SESSION_ID 并且还设置了不同的 ServiceBusMessageHeaders.PARTITION_KEY 标头时,会话 ID 的值最终将用于覆盖分区键的值。

自定义服务总线客户端属性

开发人员可以使用 AzureServiceClientBuilderCustomizer 来自定义服务总线客户端属性。 以下示例自定义 sessionIdleTimeout中的 ServiceBusClientBuilder 属性:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

样品

有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库

Spring 与 Azure 存储队列集成

关键概念

Azure 队列存储是用于存储大量消息的服务。 使用 HTTP 或 HTTPS 通过经过身份验证的调用从世界任何地方访问消息。 队列消息的大小最多可为 64 KB。 队列可能包含数百万条消息,最大容量限制为存储帐户的总容量限制。 队列通常用于创建以异步方式处理的积压工作。

依赖项设置

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

配置

此初学者提供以下配置选项:

连接配置属性

本部分包含用于连接到 Azure 存储队列的配置选项。

注意

如果选择使用安全主体对访问 Azure 资源的 Microsoft Entra ID 进行身份验证和授权,请参阅 使用 Microsoft Entra ID 授权访问,以确保安全主体已获得访问 Azure 资源的足够权限。

spring-cloud-azure-starter-integration-storage-queue 的连接可配置属性:

财产 类型 描述
spring.cloud.azure.storage.queue.enabled 布尔 是否启用了 Azure 存储队列。
spring.cloud.azure.storage.queue.connection-string 字符串 存储队列命名空间连接字符串值。
spring.cloud.azure.storage.queue.accountName 字符串 存储帐户名称。
spring.cloud.azure.storage.queue.accountKey 字符串 存储帐户密钥。
spring.cloud.azure.storage.queue.endpoint 字符串 存储队列服务终结点。
spring.cloud.azure.storage.queue.sasToken 字符串 Sas 令牌凭据
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion 发出 API 请求时使用的 QueueServiceVersion。
spring.cloud.azure.storage.queue.messageEncoding 字符串 队列消息编码。

基本用法

将消息发送到 Azure 存储队列

  1. 填写凭据配置选项。

    • 对于凭据作为连接字符串,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      

      注意

      Microsoft 建议使用最安全的可用身份验证流。 本过程中介绍的身份验证流程(例如数据库、缓存、消息传送或 AI 服务)需要非常高的信任度,并携带其他流中不存在的风险。 仅当更安全的选项(例如无密码连接或无密钥连接的托管标识)不可行时,才使用此流。 对于本地计算机操作,首选无密码连接或无密钥连接的用户标识。

    • 对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

注意

允许 tenant-id 的值包括:commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。

  • 对于作为服务主体的凭据,请在 application.yml 文件中配置以下属性:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

注意

允许 tenant-id 的值包括:commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。

  1. 使用 DefaultMessageHandler bean 创建 StorageQueueTemplate,以将消息发送到存储队列。

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. 通过消息通道创建包含上述消息处理程序的消息网关绑定。

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. 使用网关发送消息。

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

从 Azure 存储队列接收消息

  1. 填写凭据配置选项。

  2. 创建消息通道作为输入通道的 bean。

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. 使用 StorageQueueMessageSource bean 创建 StorageQueueTemplate,以接收到存储队列的消息。

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. 使用在上一步中创建的 StorageQueueMessageSource 通过之前创建的消息通道创建消息接收器绑定。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

样品

有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库