Spring Cloud Stream 与 Azure 服务总线
本文介绍如何使用 Spring Cloud Stream Binder 通过服务总线 queues
和 topics
收发消息。
Azure 提供了一个异步消息平台,称为 Azure 服务总线(“服务总线”),该平台基于高级消息队列协议 1.0(“AMQP 1.0”)标准。 服务总线可用于各种受支持的 Azure 平台。
先决条件
Azure 订阅 - 免费创建订阅。
Java 开发工具包 (JDK) 版本 8 或更高版本。
Apache Maven 版本 3.2 或更高版本。
用来测试功能的 cURL 或类似的 HTTP 实用工具。
Spring Boot 应用程序。 如果没有,请使用 Spring Initializr 创建一个 Maven 项目。 请务必选择 Maven 项目,并在依赖项下添加 Spring Web 和 Azure 支持依赖项,然后选择 Java 版本 8 或更高版本。
注意
若要授予帐户对 Azure 服务总线资源的访问权限,请将 Azure Service Bus Data Sender
和 Azure Service Bus Data Receiver
角色分配给当前使用的 Microsoft Entra 帐户。 有关授予访问权限角色的详细信息,请参阅使用 Azure 门户分配 Azure 角色和 使用 Microsoft Entra ID 对应用程序进行身份验证和授权,使之能够访问 Azure 服务总线实体。
重要
要完成本文中的步骤,需要 Spring Boot 版本 2.5 或更高版本。
从 Azure 服务总线发送和接收消息
使用 Azure 服务总线的队列或主题,可以使用 Spring Cloud Azure Stream Binder Service Bus 发送和接收消息。
要安装 Spring Cloud Azure Stream Binder Service Bus 模块,请将以下依赖项添加到 pom.xml 文件:
Spring Cloud Azure 物料清单 (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.18.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.19.0
。 此物料清单 (BOM) 应在 pom.xml 文件的<dependencyManagement>
部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure Stream Binder Service Bus 项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
编写应用程序代码
使用以下步骤配置应用程序,使其使用服务总线队列或主题来发送和接收消息。
在配置文件
application.properties
中配置服务总线凭据。spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
下表描述了配置中的字段:
字段 说明 spring.cloud.azure.servicebus.namespace
指定从 Azure 门户在服务总线中获取的命名空间。 spring.cloud.stream.bindings.consume-in-0.destination
指定在本教程中使用的服务总线队列或服务总线主题。 spring.cloud.stream.bindings.supply-out-0.destination
指定用于输入目标的相同值。 spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
指定是否自动解决消息。 如果设置为 false,则会添加 Checkpointer
的消息标头,使开发人员能够手动解决消息。spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
指定输出绑定的实体类型,可以是 queue
或topic
。spring.cloud.function.definition
指定要将哪个功能 bean 绑定到由绑定公开的外部目标。 spring.cloud.stream.poller.fixed-delay
为默认轮询器指定固定延迟(以毫秒为单位)。 默认值为 1000 L。建议值为 60000。 spring.cloud.stream.poller.initial-delay
指定定期触发器的初始延迟。 默认值为 0。 编辑启动类文件以显示以下内容。
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
提示
在本教程中,配置或代码中没有身份验证操作。 但连接到 Azure 服务需要进行身份验证。 要完成身份验证,需要使用 Azure 标识。 Spring Cloud Azure 使用 Azure 标识库提供的
DefaultAzureCredential
来帮助获取凭据,而无需更改任何代码。DefaultAzureCredential
支持多种身份验证方法,并确定应在运行时使用哪种方法。 通过这种方法,你的应用可在不同环境(例如本地与生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅 DefaultAzureCredential。若要在本地开发环境中完成身份验证,可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 有关详细信息,请参阅 Java 开发环境中的 Azure 身份验证。 若要在 Azure 托管环境中完成身份验证,建议使用用户分配的托管标识。 有关详细信息,请参阅什么是 Azure 资源的托管标识?
启动应用程序。 类似以下示例的消息将被发布在你的应用程序日志中:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed