Spring Cloud Azure 对 Spring Cloud Stream 的支持
本文 适用于:✅ 版本 4.19.0 ✅ 版本 5.19.0
Spring Cloud Stream 是一个框架,用于构建与共享消息系统连接的高度可缩放的事件驱动微服务。
该框架提供了一个灵活的编程模型,基于已建立和熟悉的 Spring 习惯和最佳做法。 这些最佳做法包括对持久性发布/子语义、使用者组和有状态分区的支持。
当前绑定器实现包括:
-
spring-cloud-azure-stream-binder-eventhubs
- 有关详细信息,请参阅 适用于 Azure 事件中心的 Spring Cloud Stream Binder - 有关详细信息,请参阅适用于 Azure 服务总线 的 Spring Cloud Stream Binder
适用于 Azure 事件中心的 Spring Cloud Stream Binder
关键概念
适用于 Azure 事件中心的 Spring Cloud Stream Binder 为 Spring Cloud Stream 框架提供绑定实现。 此实现在其基础上使用 Spring Integration 事件中心通道适配器。 从设计的角度来看,事件中心与 Kafka 类似。 此外,可以通过 Kafka API 访问事件中心。 如果项目依赖于 Kafka API,则可以尝试使用 Kafka API 示例 事件中心
使用者组
事件中心提供与 Apache Kafka 类似的使用者组支持,但逻辑略有不同。 当 Kafka 将所有已提交的偏移量存储在中转站中时,必须存储正在手动处理的事件中心消息的偏移量。 事件中心 SDK 提供用于在 Azure 存储中存储此类偏移量的函数。
分区支持
事件中心提供与 Kafka 类似的物理分区概念。 但是,与 Kafka 在使用者和分区之间自动重新平衡不同,事件中心提供了一种抢占模式。 存储帐户充当租约,以确定哪个使用者拥有哪个分区。 当新的使用者启动时,它会尝试从加载最重的使用者中窃取某些分区,以实现工作负荷均衡。
若要指定负载均衡策略,将提供 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
的属性。 有关详细信息,请参阅 使用者属性 部分。
Batch 使用者支持
Spring Cloud Azure Stream 事件中心绑定器支持 Spring Cloud Stream Batch 使用者功能。
若要使用批处理使用者模式,请将 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
属性设置为 true
。 启用后,将收到包含批处理事件列表有效负载的消息,并将其传递给 Consumer
函数。 每个消息标头也转换为列表,其中内容是从每个事件分析的关联标头值。 分区 ID、检查点器和最后一个排队属性的公共标头显示为单个值,因为整个事件批次共享相同的值。 有关详细信息,请参阅 spring Integration
注意
仅当使用 MANUAL
检查点模式时,检查点标头才存在。
批处理使用者的检查点支持两种模式:BATCH
和 MANUAL
。
BATCH
模式是一种自动检查点模式,在绑定器收到事件后,将整个事件批处理一起检查点。
MANUAL
模式是检查用户的事件。 使用时,Checkpointer
将传递到消息标头中,用户可以使用它执行检查点。
可以通过设置前缀为 max-size
的 max-wait-time
和 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
属性来指定批大小。
max-size
属性是必需的,max-wait-time
属性是可选的。 有关详细信息,请参阅 使用者属性 部分。
依赖项设置
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
或者,也可以使用 Spring Cloud Azure Stream 事件中心初学者,如以下 Maven 示例所示:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
配置
绑定器提供以下三部分的配置选项:
连接配置属性
本部分包含用于连接到 Azure 事件中心的配置选项。
注意
如果选择使用安全主体对访问 Azure 资源的 Microsoft Entra ID 进行身份验证和授权,请参阅 使用 Microsoft Entra ID 授权访问,以确保安全主体已获得访问 Azure 资源的足够权限。
spring-cloud-azure-stream-binder-eventhubs 的连接可配置属性:
财产 | 类型 | 描述 |
---|---|---|
spring.cloud.azure.eventhubs.enabled | 布尔 | 是否启用 Azure 事件中心。 |
spring.cloud.azure.eventhubs.connection-string | 字符串 | 事件中心命名空间连接字符串值。 |
spring.cloud.azure.eventhubs.namespace | 字符串 | 事件中心命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成 |
spring.cloud.azure.eventhubs.domain-name | 字符串 | Azure 事件中心命名空间值的域名。 |
spring.cloud.azure.eventhubs.custom-endpoint-address | 字符串 | 自定义终结点地址。 |
提示
常见的 Azure 服务 SDK 配置选项也可以针对 Spring Cloud Azure Stream 事件中心绑定器进行配置。
Spring Cloud Azure 配置中引入了受支持的配置选项,可以使用统一前缀 spring.cloud.azure.
或 spring.cloud.azure.eventhubs.
前缀进行配置。
默认情况下,绑定器还支持 Spring Could Azure 资源管理器
检查点配置属性
本部分包含存储 Blob 服务的配置选项,用于保存分区所有权和检查点信息。
注意
从版本 4.0.0 开始,如果未手动启用 spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 的属性,则不会使用 spring.cloud.stream.bindings.binding-name.destination的名称自动创建存储容器。
检查点 spring-cloud-azure-stream-binder-eventhubs 的可配置属性:
财产 | 类型 | 描述 |
---|---|---|
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists | 布尔 | 是否允许创建容器(如果不存在)。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | 字符串 | 存储帐户的名称。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | 字符串 | 存储帐户访问密钥。 |
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | 字符串 | 存储容器名称。 |
提示
常见的 Azure 服务 SDK 配置选项也可用于存储 Blob 检查点存储。
Spring Cloud Azure 配置中引入了受支持的配置选项,可以使用统一前缀 spring.cloud.azure.
或 spring.cloud.azure.eventhubs.processor.checkpoint-store
前缀进行配置。
Azure 事件中心绑定配置属性
以下选项分为四个部分:使用者属性、高级使用者配置、生成者属性和高级生成者配置。
使用者属性
这些属性通过 EventHubsConsumerProperties
公开。
注意
为了避免重复,由于版本 4.19.0 和 5.19.0,Spring Cloud Azure Stream Binder 事件中心支持为所有通道设置值,格式为 spring.cloud.stream.eventhubs.default.consumer.<property>=<value>
。
spring-cloud-azure-stream-binder-eventhubs 的使用者可配置属性:
财产 | 类型 | 描述 |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode | CheckpointMode | 使用者决定如何检查点消息时使用的检查点模式 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count | 整数 | 确定每个分区的消息量以执行一个检查点。 仅在使用 PARTITION_COUNT 检查点模式时生效。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval | 期间 | 决定执行一个检查点的时间间隔。 仅在使用 TIME 检查点模式时生效。 |
spring.cloud.stream.eventhubs.bindings。<binding-name.consumer.batch.max 大小 | 整数 | 批处理中的最大事件数。 批处理使用者模式是必需的。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | 期间 | 批处理使用的最大持续时间。 只有在启用批处理使用者模式并且是可选的时才生效。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | 期间 | 更新的时间间隔持续时间。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | 负载均衡策略。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | 期间 | 分区所有权过期的持续时间。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | 布尔 | 事件处理程序是否应请求有关其关联分区上最后排队事件的信息,并跟踪接收事件时的信息。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | 整数 | 使用者用来控制事件中心使用者在本地主动接收和排队的事件数的计数。 |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | 将键映射为分区 ID 和 StartPositionProperties 的值 |
如果检查点存储中不存在分区的检查点,则包含要用于每个分区的事件位置的映射。 此映射从分区 ID 中键键。 |
注意
initial-partition-event-position
配置接受一个 map
来指定每个事件中心的初始位置。 因此,其键是分区 ID,值是 StartPositionProperties
,其中包括偏移量、序列号、排队日期时间和是否非独占的属性。 例如,可以将它设置为
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
高级使用者配置
上述 连接、检查点,以及 常见的 Azure SDK 客户端 配置支持对每个绑定器使用者进行自定义,可以使用前缀 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
进行配置。
生成者属性
这些属性通过 EventHubsProducerProperties
公开。
注意
为了避免重复,由于版本 4.19.0 和 5.19.0,Spring Cloud Azure Stream Binder 事件中心支持为所有通道设置值,格式为 spring.cloud.stream.eventhubs.default.producer.<property>=<value>
。
spring-cloud-azure-stream-binder-eventhubs 的生成者可配置属性:
财产 | 类型 | 描述 |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | 布尔 | 生成者的同步的开关标志。 如果为 true,生成者将在发送操作后等待响应。 |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | 长 | 发送操作后等待响应的时间量。 仅当启用同步生成者时才会生效。 |
高级生成者配置
上述 连接 和 常见的 Azure SDK 客户端 配置支持对每个绑定器生成者进行自定义,可以使用前缀 spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
进行配置。
基本用法
从/向事件中心发送和接收消息
使用凭据信息填充配置选项。
对于凭据作为连接字符串,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
对于作为服务主体的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
注意
允许 tenant-id
的值包括:common
、organizations
、consumers
或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。
对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
定义供应商和使用者。
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
分区支持
将创建包含用户提供的分区信息的 PartitionSupplier
,以配置要发送的消息的分区信息。 以程图显示了获取分区 ID 和键的不同优先级的过程:
显示分区支持过程的流程图的
Batch 使用者支持
提供批处理配置选项,如以下示例所示:
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
定义供应商和使用者。
对于
BATCH
检查点模式,可以使用以下代码发送消息并在批处理中使用。@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
对于
MANUAL
检查点模式,可以使用以下代码在批处理中发送消息和使用/检查点。@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
注意
在批处理使用模式下,Spring Cloud Stream 绑定器的默认内容类型 application/json
,因此请确保消息有效负载与内容类型保持一致。 例如,使用默认内容类型 application/json
接收具有 String
有效负载的消息时,有效负载应 JSON String
,并用原始 String
文本的双引号括起来。 虽然对于 text/plain
内容类型,它可以是直接 String
对象。 有关详细信息,请参阅 Spring Cloud Stream 内容类型协商。
处理错误消息
处理出站绑定错误消息
默认情况下,Spring Integration 将创建一个名为
errorChannel
的全局错误通道。 配置以下消息终结点以处理出站绑定错误消息。@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
处理入站绑定错误消息
Spring Cloud Stream 事件中心 Binder 支持一种解决方案来处理入站消息绑定的错误:错误处理程序。
错误处理程序:
Spring Cloud Stream 通过添加接受
Consumer
实例的ErrorMessage
来公开一种机制,以便提供自定义错误处理程序。 有关详细信息,请参阅 Spring Cloud Stream 文档中 处理错误消息。绑定默认错误处理程序
配置单个
Consumer
bean 以使用所有入站绑定错误消息。 以下默认函数订阅每个入站绑定错误通道:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
还需要将
spring.cloud.stream.default.error-handler-definition
属性设置为函数名称。特定于绑定的错误处理程序
将
Consumer
bean 配置为使用特定的入站绑定错误消息。 以下函数订阅特定的入站绑定错误通道,优先级高于绑定默认错误处理程序:@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
还需要将
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
属性设置为函数名称。
事件中心消息标头
有关支持的基本消息标头,请参阅 spring Integration
多个绑定器支持
使用多个绑定器也支持连接到多个事件中心命名空间。 此示例采用连接字符串作为示例。 还支持服务主体和托管标识的凭据。 可以在每个联编程序的环境设置中设置相关属性。
若要将多个绑定器用于事件中心,请在 application.yml 文件中配置以下属性:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
注意
前面的应用程序文件演示如何为所有绑定配置应用程序的单个默认轮询器。 如果要为特定绑定配置轮询器,可以使用
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
等配置。我们需要定义两个供应商和两个消费者:
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
资源预配
事件中心绑定器支持预配事件中心和使用者组,用户可以使用以下属性来启用预配。
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
注意
允许 tenant-id
的值包括:common
、organizations
、consumers
或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。
样品
有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库
适用于 Azure 服务总线的 Spring Cloud Stream Binder
关键概念
适用于 Azure 服务总线的 Spring Cloud Stream Binder 为 Spring Cloud Stream Framework 提供绑定实现。 此实现在其基础上使用 Spring Integration Service 总线通道适配器。
计划消息
此绑定器支持将消息提交到主题以供延迟处理。 用户可以发送具有标头 x-delay
以毫秒为单位表示消息延迟时间的计划消息。 消息将在 x-delay
毫秒后传递到相应的主题。
使用者组
服务总线主题提供与 Apache Kafka 类似的使用者组支持,但逻辑略有不同。
此绑定器依赖于主题 Subscription
充当使用者组。
依赖项设置
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
或者,也可以使用 Spring Cloud Azure Stream 服务总线初学者,如以下 Maven 示例所示:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
配置
绑定器提供以下两部分的配置选项:
连接配置属性
本部分包含用于连接到 Azure 服务总线的配置选项。
注意
如果选择使用安全主体对访问 Azure 资源的 Microsoft Entra ID 进行身份验证和授权,请参阅 使用 Microsoft Entra ID 授权访问,以确保安全主体已获得访问 Azure 资源的足够权限。
spring-cloud-azure-stream-binder-servicebus 的连接可配置属性:
财产 | 类型 | 描述 |
---|---|---|
spring.cloud.azure.servicebus.enabled | 布尔 | 是否启用 Azure 服务总线。 |
spring.cloud.azure.servicebus.connection-string | 字符串 | 服务总线命名空间连接字符串值。 |
spring.cloud.azure.servicebus.custom-endpoint-address | 字符串 | 连接到服务总线时要使用的自定义终结点地址。 |
spring.cloud.azure.servicebus.namespace | 字符串 | 服务总线命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成 |
spring.cloud.azure.servicebus.domain-name | 字符串 | Azure 服务总线命名空间值的域名。 |
注意
常见的 Azure 服务 SDK 配置选项也可以针对 Spring Cloud Azure 流服务总线绑定器进行配置。
Spring Cloud Azure 配置中引入了受支持的配置选项,可以使用统一前缀 spring.cloud.azure.
或 spring.cloud.azure.servicebus.
前缀进行配置。
默认情况下,绑定器还支持 Spring Could Azure 资源管理器
Azure 服务总线绑定配置属性
以下选项分为四个部分:使用者属性、高级使用者配置、生成者属性和高级生成者配置。
使用者属性
这些属性通过 ServiceBusConsumerProperties
公开。
注意
为了避免重复,自版本 4.19.0 和 5.19.0 起,Spring Cloud Azure Stream Binder 服务总线支持为所有通道设置值,格式为 spring.cloud.stream.servicebus.default.consumer.<property>=<value>
。
spring-cloud-azure-stream-binder-servicebus 的使用者可配置属性:
财产 | 类型 | 违约 | 描述 |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | 布尔 | 假 | 如果失败的消息路由到 DLQ。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | 整数 | 1 | 服务总线处理器客户端应处理的最大并发消息数。 启用会话后,它将应用于每个会话。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | 整数 | 零 | 要在任何给定时间处理的最大并发会话数。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | 布尔 | 零 | 是否启用会话。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | 整数 | 0 | 服务总线处理器客户端的预提取计数。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | SubQueue | 没有 | 要连接到的子队列的类型。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | 期间 | 5m | 继续自动续订锁的时间量。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | 服务总线处理器客户端的接收模式。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | 布尔 | 真 | 是否自动解决消息。 如果设置为 false,则会添加 Checkpointer 的消息标头,使开发人员能够手动解决消息。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max MB 大小 | Long | 1024 | 队列/主题的最大大小(以兆字节为单位),即为队列/主题分配的内存大小。 |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live | 期间 | P10675199DT2H48M5.4775807S。 (10675199天、2 小时、48 分钟、5 秒和 477 毫秒) | 消息过期的持续时间,从消息发送到服务总线时开始。 |
重要
使用 Azure 资源管理器(ARM)时,必须配置 spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type
属性。 有关详细信息,请参阅 GitHub 上的 servicebus-queue-binder-arm 示例。
高级使用者配置
上述 连接 和 常见的 Azure SDK 客户端 配置支持对每个绑定器使用者进行自定义,可以使用前缀 spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
进行配置。
生成者属性
这些属性通过 ServiceBusProducerProperties
公开。
注意
为了避免重复,自版本 4.19.0 和 5.19.0 起,Spring Cloud Azure Stream Binder 服务总线支持为所有通道设置值,格式为 spring.cloud.stream.servicebus.default.producer.<property>=<value>
。
spring-cloud-azure-stream-binder-servicebus 的生成者可配置属性:
财产 | 类型 | 违约 | 描述 |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | 布尔 | 假 | 用于同步生成者的切换标志。 |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | 长 | 10000 | 发送生成者的超时值。 |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | 零 | 绑定生成者所需的生成者的服务总线实体类型。 |
spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-mbtes | Long | 1024 | 队列/主题的最大大小(以兆字节为单位),即为队列/主题分配的内存大小。 |
spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live | 期间 | P10675199DT2H48M5.4775807S。 (10675199天、2 小时、48 分钟、5 秒和 477 毫秒) | 消息过期的持续时间,从消息发送到服务总线时开始。 |
重要
使用绑定生成者时,需要配置 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
的属性。
高级生成者配置
上述 连接 和 常见的 Azure SDK 客户端 配置支持对每个绑定器生成者进行自定义,可以使用前缀 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
进行配置。
基本用法
发送和接收来自/到服务总线的消息
使用凭据信息填充配置选项。
对于凭据作为连接字符串,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
对于作为服务主体的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
注意
允许 tenant-id
的值包括:common
、organizations
、consumers
或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。
对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
定义供应商和使用者。
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
分区键支持
绑定器通过允许在消息标头中设置分区键和会话 ID,支持 服务总线分区。 本部分介绍如何设置消息的分区键。
Spring Cloud Stream 提供分区键 SpEL 表达式属性 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
。 例如,将此属性设置为 "'partitionKey-' + headers[<message-header-key>]"
并添加一个名为 message-header-key 的标头。 Spring Cloud Stream 在计算表达式以分配分区键时使用此标头的值。 以下代码提供示例生成者:
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
会话支持
绑定器支持服务总线
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
注意
根据 服务总线分区,会话 ID 的优先级高于分区键。 因此,当同时设置 ServiceBusMessageHeaders#SESSION_ID
和 ServiceBusMessageHeaders#PARTITION_KEY
标头时,会话 ID 的值最终用于覆盖分区键的值。
处理错误消息
处理出站绑定错误消息
默认情况下,Spring Integration 将创建一个名为
errorChannel
的全局错误通道。 配置以下消息终结点以处理出站绑定错误消息。@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
处理入站绑定错误消息
Spring Cloud Stream 服务总线绑定器支持两种解决方案来处理入站消息绑定的错误:绑定器错误处理程序和处理程序。
Binder 错误处理程序:
默认绑定程序错误处理程序处理入站绑定。 启用
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
时,使用此处理程序将失败的消息发送到死信队列。 否则,将放弃失败的消息。 绑定器错误处理程序与其他提供的错误处理程序互斥。错误处理程序:
Spring Cloud Stream 通过添加接受
Consumer
实例的ErrorMessage
来公开一种机制,以便提供自定义错误处理程序。 有关详细信息,请参阅 Spring Cloud Stream 文档中 处理错误消息。绑定默认错误处理程序
配置单个
Consumer
bean 以使用所有入站绑定错误消息。 以下默认函数订阅每个入站绑定错误通道:@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
还需要将
spring.cloud.stream.default.error-handler-definition
属性设置为函数名称。特定于绑定的错误处理程序
将
Consumer
bean 配置为使用特定的入站绑定错误消息。 以下函数订阅优先级高于绑定默认错误处理程序的特定入站绑定错误通道。@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
还需要将
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
属性设置为函数名称。
服务总线消息标头
有关支持的基本消息标头,请参阅 spring Integration
注意
设置分区键时,消息标头的优先级高于 Spring Cloud Stream 属性。 因此,仅当未配置任何 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
和 ServiceBusMessageHeaders#SESSION_ID
标头时,ServiceBusMessageHeaders#PARTITION_KEY
才会生效。
多个绑定器支持
使用多个绑定器也支持连接到多个服务总线命名空间。 此示例采用连接字符串作为示例。 还支持服务主体和托管标识的凭据,用户可以在每个绑定器的环境设置中设置相关属性。
若要使用 ServiceBus 的多个绑定器,请在 application.yml 文件中配置以下属性:
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
注意
前面的应用程序文件演示如何为所有绑定配置应用程序的单个默认轮询器。 如果要为特定绑定配置轮询器,可以使用
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
等配置。我们需要定义两个供应商和两个消费者
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
资源预配
服务总线绑定器支持预配队列、主题和订阅,用户可以使用以下属性来启用预配。
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
注意
允许 tenant-id
的值包括:common
、organizations
、consumers
或租户 ID。 有关这些值的详细信息,请参阅 使用错误的终结点(个人和组织帐户)错误AADSTS50020 - 租户中不存在来自标识提供者的用户帐户。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。
自定义服务总线客户端属性
开发人员可以使用 AzureServiceClientBuilderCustomizer
来自定义服务总线客户端属性。 以下示例自定义 sessionIdleTimeout
中的 ServiceBusClientBuilder
属性:
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
样品
有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库