在 Spring 应用程序中使用 Azure 服务总线
本文介绍如何在使用 Spring 框架生成的 Java 应用程序中使用 Azure 服务总线。
Azure 提供了一个名为 Azure 服务总线(服务总线)的异步消息传送平台,该平台基于高级消息队列协议 1.0 (AMQP 1.0) 标准。 可以在一系列受支持的 Azure 平台上使用服务总线。
Spring Cloud Azure 提供了各种模块,用于使用 Spring 框架向服务总线队列和主题/订阅发送消息以及接收消息。
可以单独使用以下模块,也可以将它们组合用于不同的用例:
Spring Cloud Azure Service Bus Starter 使你能够使用具有 Spring Boot 功能的服务总线 Java SDK 客户端库发送和接收消息。
Spring Cloud Azure Service Bus JMS Starter 使你能够使用 JMS API 发送和接收包含服务总线队列和主题/订阅的消息。
Spring 消息传递 Azure 服务总线使你能够通过 Spring Messaging API 与服务总线进行交互。
Spring 集成 Azure 服务总线使你能够将 Spring 集成消息通道与服务总线 连接。
Spring Cloud Stream Binder for Service Bus 使你能够将服务总线用作 Spring Cloud Stream 应用程序中的消息传递中间件。
先决条件
- Azure 订阅 - 免费创建订阅。
- Java 开发工具包 (JDK) 版本 8 或更高版本。
- Apache Maven 版本 3.0 或更高版本。
- Azure 服务总线和队列或主题/订阅。 如果没有队列或主题,请创建服务总线队列或主题。 有关详细信息,请参阅使用 Azure 门户创建服务总线命名空间和队列或使用 Azure 门户创建服务总线主题和该主题的订阅。
- 如果你没有 Spring Boot 应用程序,请使用 Spring Initializr 创建一个 Maven 项目。 请务必选择“Maven 项目”,并在“依赖项”下添加“Spring Web”依赖项,然后选择“Java 版本 8 或更高版本”。
注意
若要授予帐户对服务总线资源的访问权限,请在新创建的 Azure 服务总线命名空间中,将Azure 服务总线数据发送方和 Azure 服务总线数据接收方角色分配给当前正在使用的 Microsoft Entra 帐户。 有关详细信息,请参阅使用 Azure 门户分配 Azure 角色。
重要
要完成本教程中的步骤,需要 Spring Boot 版本 2.5 或更高版本。
准备本地环境
在本教程中,配置和代码没有任何身份验证操作。 但连接到 Azure 服务需要进行身份验证。 要完成身份验证,需要使用 Azure 标识客户端库。 Spring Cloud Azure 使用 Azure 标识库提供的 DefaultAzureCredential
来帮助获取凭据,而无需更改任何代码。
DefaultAzureCredential
支持多种身份验证方法,并确定应在运行时使用哪种方法。 通过这种方法,你的应用可在不同环境(例如本地或生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅对 Azure 托管的 Java 应用程序进行身份验证的 DefaultAzureCredential 部分。
若要使用 Azure CLI、IntelliJ 或其他方法在本地开发环境中完成身份验证,请参阅 Java 开发环境中的 Azure 身份验证。 若要在 Azure 托管环境中完成身份验证,建议使用托管标识。 有关详细信息,请参阅什么是 Azure 资源的托管标识?
注意
适用于 JMS API 的 Azure 服务总线目前不支持 DefaultAzureCredential
。 如果要将 Spring JMS 与服务总线配合使用,请忽略此步骤。
使用 Spring Cloud Azure Service Bus Starter
Spring Cloud Azure Service Bus Starter 模块使用 Spring Boot 框架导入服务总线 Java 客户端库。 可以在非互斥模式中使用 Spring Cloud Azure 和 Azure SDK。 因此,可以在 Spring 应用程序中继续使用服务总线 Java 客户端 API。
添加服务总线依赖项
要安装 Spring Cloud Azure Service Bus Starter 模块,请将以下依赖项添加到 pom.xml 文件:
Spring Cloud Azure 物料清单 (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.19.0
。 此物料清单 (BOM) 应在 pom.xml 文件的<dependencyManagement>
部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure Service Bus 项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-servicebus</artifactId> </dependency>
编写应用程序代码以发送和接收消息
本指南介绍如何在 Spring 应用程序的上下文中使用服务总线 Java 客户端。 在这里,我们介绍两种替代方法。 推荐的方法是使用 Spring Boot 自动配置,并从 Spring 上下文中使用现成的客户端。 另一种方法是以编程方式自行构建客户端。
第一种方法涉及从 Spring IoC 容器自动连接客户端 bean,与第二种方法相比,具有以下优势。 使用服务总线客户端进行开发时,这些优势可提供更灵活、更高效的体验。
可以使用外部化配置,这样你就可以在不同的环境中使用相同的应用程序代码。
可以将学习生成器模式并将此客户端注册到应用程序上下文的过程委托给 Spring Boot 框架。 通过此委托,你可以专注于如何根据自己的业务需求使用客户端。
可以使用运行状况指示器以一种简单的方式来检查应用程序和内部组件的状态和运行状况。
下面的代码示例演示了如何将 ServiceBusSenderClient
和 ServiceBusProcessorClient
与这两个选项一起使用。
注意
Azure Java SDK for Service Bus 提供多个客户端与服务总线交互。 初学者还为所有服务总线客户端和客户端生成器提供自动配置。 这里我们只使用 ServiceBusSenderClient
和 ServiceBusProcessorClient
作为示例。
使用 Spring Boot 自动配置
若要向服务总线发送消息并从中接收消息,请使用以下步骤配置应用程序:
配置服务总线命名空间和队列,如以下示例所示:
spring.cloud.azure.servicebus.namespace=<your-servicebus-namespace-name> spring.cloud.azure.servicebus.entity-name=<your-servicebus-queue-name> spring.cloud.azure.servicebus.entity-type=queue
提示
此处我们使用服务总线队列作为示例。 若要使用主题/订阅,需要添加
spring.cloud.azure.servicebus.processor.subscription-name
属性,并将entity-type
值更改为topic
。创建一个新
ServiceBusProcessorClientConfiguration
Java 类,如以下示例所示。 此类用于注册ServiceBusProcessorClient
的消息和错误处理程序。@Configuration(proxyBeanMethods = false) public class ServiceBusProcessorClientConfiguration { @Bean ServiceBusRecordMessageListener processMessage() { return context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Id: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }; } @Bean ServiceBusErrorHandler processError() { return context -> { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); }; } }
在 Spring 应用程序中注入
ServiceBusSenderClient
,并调用相关 API 以发送消息,如以下示例所示:@SpringBootApplication public class ServiceBusQueueApplication implements CommandLineRunner { private final ServiceBusSenderClient senderClient; public ServiceBusQueueApplication(ServiceBusSenderClient senderClient) { this.senderClient = senderClient; } public static void main(String[] args) { SpringApplication.run(ServiceBusQueueApplication.class, args); } @Override public void run(String... args) throws Exception { // send one message to the queue senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.printf("Sent a message to the queue"); senderClient.close(); // wait the processor client to consume messages TimeUnit.SECONDS.sleep(10); } }
注意
默认情况下,自动连接
ServiceBusProcessorClient
bean 的生命周期由 Spring 上下文管理。 当 Spring 应用程序上下文启动时,处理器会自动启动,并在 Spring 应用程序上下文停止时停止。 若要禁用此功能,请配置spring.cloud.azure.servicebus.processor.auto-startup=false
。启动应用程序。 你将看到与以下示例类似的日志:
Sent a message to the queue Processing message. Id: 6f405435200047069a3caf80893a80bc, Sequence #: 1. Contents: Hello, World!
以编程方式生成服务总线客户端
你可以自行生成这些客户端 bean,但该过程很复杂。 在 Spring Boot 应用程序中,必须管理属性、了解生成器模式,并将客户端注册到 Spring 应用程序上下文。 以下代码示例演示了如何执行这项操作:
创建一个新
ServiceBusClientConfiguration
Java 类,如以下示例所示。 此类用于声明ServiceBusSenderClient
和ServiceBusProcessorClient
bean。@Configuration(proxyBeanMethods = false) public class ServiceBusClientConfiguration { private static final String SERVICE_BUS_FQDN = "<service-bus-fully-qualified-namespace>"; private static final String QUEUE_NAME = "<service-bus-queue-name>"; @Bean ServiceBusClientBuilder serviceBusClientBuilder() { return new ServiceBusClientBuilder() .fullyQualifiedNamespace(SERVICE_BUS_FQDN) .credential(new DefaultAzureCredentialBuilder().build()); } @Bean ServiceBusSenderClient serviceBusSenderClient(ServiceBusClientBuilder builder) { return builder .sender() .queueName(QUEUE_NAME) .buildClient(); } @Bean ServiceBusProcessorClient serviceBusProcessorClient(ServiceBusClientBuilder builder) { return builder.processor() .queueName(QUEUE_NAME) .processMessage(ServiceBusClientConfiguration::processMessage) .processError(ServiceBusClientConfiguration::processError) .buildProcessorClient(); } private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Id: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); } private static void processError(ServiceBusErrorContext context) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); } }
注意
请务必将
<service-bus-fully-qualified-namespace>
占位符替换为 Azure 门户中的服务总线主机名。 用你在服务总线命名空间中配置的队列名称替换<service-bus-queue-name>
占位符。将客户端 Bean 注入到你的应用程序中,如以下示例所示:
@SpringBootApplication public class ServiceBusQueueApplication implements CommandLineRunner { private final ServiceBusSenderClient senderClient; private final ServiceBusProcessorClient processorClient; public ServiceBusQueueApplication(ServiceBusSenderClient senderClient, ServiceBusProcessorClient processorClient) { this.senderClient = senderClient; this.processorClient = processorClient; } public static void main(String[] args) { SpringApplication.run(ServiceBusQueueApplication.class, args); } @Override public void run(String... args) throws Exception { // send one message to the queue senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.printf("Sent a message to the queue"); senderClient.close(); System.out.printf("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.printf("Stopping and closing the processor"); processorClient.close(); } }
启动应用程序。 你将看到与以下示例类似的日志:
Sent a message to the queue Starting the processor ... Processing message. Id: 6f405435200047069a3caf80893a80bc, Sequence #: 1. Contents: Hello, World! Stopping and closing the processor
以下列表显示了此代码不灵活或正常的一些原因:
- 命名空间和队列/主题/订阅名称已硬编码。
- 如果使用
@Value
从 Spring 环境获取配置,则 application.properties 文件中不能有 IDE 提示。 - 如果有微服务方案,则必须复制每个项目中的代码,这很容易出错,也很难保持一致。
幸运的是,使用 Spring Cloud Azure 不需要自行构建客户端 bean。 相反,你可以直接注入 bean,并使用已经熟悉的配置属性来配置服务总线。
Spring Cloud Azure 还为不同方案提供以下全局配置。 有关详细信息,请参阅 Spring Cloud Azure 配置的 Azure 服务 SDK 的全局配置部分。
- 代理选项。
- 重试选项。
- AMQP 传输客户端选项。
还可以连接到不同的 Azure 云。 有关详细信息,请参阅连接到不同的 Azure 云。
使用 Spring Cloud Azure Service Bus JMS Starter
Spring Cloud Azure Service Bus JMS Starter 模块提供 Spring JMS 与服务总线的集成。 以下视频介绍如何使用 JMS 2.0 将 Spring JMS 应用程序与 Azure 服务总线集成。
本指南介绍如何使用 Spring Cloud Azure Service Bus Starter for JMS API 向服务总线发送消息和从服务总线接收消息。
添加服务总线依赖项
要安装 Spring Cloud Azure Service Bus JMS Starter 模块,请将以下依赖项添加到 pom.xml 文件:
Spring Cloud Azure 物料清单 (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.19.0
。 此物料清单 (BOM) 应在 pom.xml 文件的<dependencyManagement>
部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure Service Bus JMS 项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-servicebus-jms</artifactId> </dependency>
编写应用程序代码以发送和接收消息
为服务总线配置连接字符串和定价层,如以下示例所示:
spring.jms.servicebus.connection-string=<service-bus-namespace-connection-string> spring.jms.servicebus.pricing-tier=<service-bus-pricing-tier>
创建消息接收方。
Spring 提供了将消息发布到任何 POJO(普通旧 Java 对象)的方法。 首先,定义一个用于存储和检索用户名的通用
User
类,如以下示例所示:public class User implements Serializable { private static final long serialVersionUID = -295422703255886286L; private String name; public User() { } public User(String name) { setName(name); } public String getName() { return name; } public void setName(String name) { this.name = name; } }
提示
实现
Serializable
是为了使用 Spring 框架的JmsTemplate
中的send
方法。 否则,你应该定义一个自定义MessageConverter
Bean,以将内容序列化为文本格式的 JSON。 有关MessageConverter
的详细信息,请参阅官方的 Spring JMS Starter 项目。在此处,可以创建新的
QueueReceiveService
Java 类,如以下示例所示。 此类用于定义消息接收方。@Component public class QueueReceiveService { private static final String QUEUE_NAME = "<service-bus-queue-name>"; @JmsListener(destination = QUEUE_NAME, containerFactory = "jmsListenerContainerFactory") public void receiveMessage(User user) { System.out.printf("Received a message from %s.", user.getName()); } }
注意
请确保用你在服务总线命名空间中配置的队列名称替换
<service-bus-queue-name>
占位符。如果使用主题/订阅,请将
destination
参数更改为主题名称,而containerFactory
应为topicJmsListenerContainerFactory
。 此外,请添加subscription
参数来描述订阅名称。将发送方和接收方连接起来,使用 Spring 发送和接收消息,如以下示例所示:
@SpringBootApplication @EnableJms public class ServiceBusJmsStarterApplication { private static final String QUEUE_NAME = "<service-bus-queue-name>"; public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(ServiceBusJMSQueueApplication.class, args); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); // Send a message with a POJO - the template reuse the message converter System.out.println("Sending a user message."); jmsTemplate.convertAndSend(QUEUE_NAME, new User("Tom")); } }
注意
请确保用你在服务总线命名空间中配置的队列名称替换
<service-bus-queue-name>
占位符。提示
请确保添加
@EnableIntegration
注释,这会触发发现用@JmsListener
注释的方法,从而在幕后创建消息侦听器容器。启动应用程序。 你将看到与以下示例类似的日志:
Sending a user message. Received a message from Tom.
其他信息
有关详细信息,请参阅如何将 JMS API 与服务总线和 AMQP 1.0 配合使用。
使用 Spring 消息传递 Azure 服务总线
Spring 消息传递 Azure 服务总线模块通过服务总线为 Spring Messaging 框架提供支持。
如果使用 Spring Messaging Azure 服务总线,则可以使用以下功能:
ServiceBusTemplate
:以异步和同步方式将消息发送到服务总线队列和主题。@ServiceBusListener
:将方法标记为目标上的服务总线消息侦听器的目标。
本指南介绍如何使用 Spring 消息传递 Azure 服务总线向服务总线发送消息以及从服务总线接收消息。
添加服务总线依赖项
要安装 Spring 消息传递 Azure 服务总线模块,请将以下依赖项添加到 pom.xml 文件:
Spring Cloud Azure 物料清单 (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.19.0
。 此物料清单 (BOM) 应在 pom.xml 文件的<dependencyManagement>
部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure。Spring 消息传递 Azure 服务总线和 Spring Cloud Azure starter 项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-messaging-azure-servicebus</artifactId> </dependency>
编写应用程序代码以发送和接收消息
配置服务总线命名空间和队列,如以下示例所示:
spring.cloud.azure.servicebus.namespace=<service-bus-namespace-name> spring.cloud.azure.servicebus.entity-type=queue
注意
如果使用主题/订阅,请将
spring.cloud.azure.servicebus.entity-type
值更改为topic
。创建一个新
ConsumerService
Java 类,如以下示例所示。 此类用于定义消息接收方。@Service public class ConsumerService { private static final String QUEUE_NAME = "<service-bus-queue-name>"; @ServiceBusListener(destination = QUEUE_NAME) public void handleMessageFromServiceBus(String message) { System.out.printf("Consume message: %s%n", message); } }
注意
如果使用主题/订阅,请将
destination
的注释参数更改为主题名称,并添加group
参数来描述订阅名称。将发送方和接收方连接起来,使用 Spring 发送和接收消息,如以下示例所示:
@SpringBootApplication @EnableAzureMessaging public class Application { private static final String QUEUE_NAME = "<service-bus-queue-name>"; public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(Application.class); ServiceBusTemplate serviceBusTemplate = applicationContext.getBean(ServiceBusTemplate.class); System.out.println("Sending a message to the queue."); serviceBusTemplate.sendAsync(QUEUE_NAME, MessageBuilder.withPayload("Hello world").build()).subscribe(); } }
提示
请确保添加
@EnableAzureMessaging
注释,这会触发发现用@ServiceBusListener
注释的方法,从而在幕后创建消息侦听器容器。启动应用程序。 你将看到与以下示例类似的日志:
Sending a message to the queue. Consume message: Hello world.
使用 Spring 集成 Azure 服务总线
Spring 集成 Azure 服务总线模块通过服务总线为 Spring 集成框架提供支持。
如果 Spring 应用程序使用 Spring 集成消息通道,则可以使用通道适配器在消息通道和服务总线之间路由消息。
入站通道适配器将来自服务总线队列或订阅的消息转发到消息通道。 出站通道适配器将消息从消息通道发布到服务总线队列和主题。
本指南介绍如何使用 Spring 集成 Azure 服务总线向服务总线发送消息以及从服务总线接收消息。
添加服务总线依赖项
要安装 Spring Cloud Azure Service Bus Integration Starter 模块,请将以下依赖项添加到 pom.xml 文件:
Spring Cloud Azure 物料清单 (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.19.0
。 此物料清单 (BOM) 应在 pom.xml 文件的<dependencyManagement>
部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure Service Bus Integration 项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId> </dependency>
编写应用程序代码以发送和接收消息
配置服务总线命名空间,如以下示例所示:
spring.cloud.azure.servicebus.namespace=<your-servicebus-namespace-name>
创建一个新
QueueReceiveConfiguration
Java 类,如以下示例所示。 此类用于定义消息接收方。@Configuration public class QueueReceiveConfiguration { private static final String INPUT_CHANNEL = "queue.input"; private static final String QUEUE_NAME = "<your-servicebus-queue-name>"; private static final String SERVICE_BUS_MESSAGE_LISTENER_CONTAINER = "queue-listener-container"; /** * This message receiver binding with {@link ServiceBusInboundChannelAdapter} * via {@link MessageChannel} has name {@value INPUT_CHANNEL} */ @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload) { String message = new String(payload); System.out.printf("New message received: '%s'%n", message); } @Bean(SERVICE_BUS_MESSAGE_LISTENER_CONTAINER) public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, @Qualifier(SERVICE_BUS_MESSAGE_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean(name = INPUT_CHANNEL) public MessageChannel input() { return new DirectChannel(); } }
创建一个新
QueueSendConfiguration
Java 类,如以下示例所示。 此类用于定义消息发送方。@Configuration public class QueueSendConfiguration { private static final String OUTPUT_CHANNEL = "queue.output"; private static final String QUEUE_NAME = "<your-servicebus-queue-name>"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { System.out.println("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { System.out.println("There was an error sending the message."); } }); return handler; } /** * Message gateway binding with {@link MessageHandler} * via {@link MessageChannel} has name {@value OUTPUT_CHANNEL} */ @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
将发送方和接收方连接起来,使用 Spring 发送和接收消息,如以下示例所示:
@SpringBootApplication @EnableIntegration @Configuration(proxyBeanMethods = false) public class ServiceBusIntegrationApplication { public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(ServiceBusIntegrationApplication.class, args); QueueSendConfiguration.QueueOutboundGateway outboundGateway = applicationContext.getBean(QueueSendConfiguration.QueueOutboundGateway.class); System.out.println("Sending a message to the queue"); outboundGateway.send("Hello World"); } }
提示
请确保添加
@EnableIntegration
批注,这将启用 Spring 集成基础结构。启动应用程序。 你将看到与以下示例类似的日志:
Message was sent successfully. New message received: 'Hello World'
使用 Spring Cloud Stream Service Bus Binder
若要在 Spring Cloud Stream 应用程序中调用服务总线 API,请使用 Spring Cloud Azure Service Bus Stream Binder 模块。
本指南介绍如何使用 Spring Cloud Stream Service Bus Binder 向服务总线发送消息以及从服务总线接收消息。
添加服务总线依赖项
要安装 Spring Cloud Azure Service Bus Stream Binder 模块,请将以下依赖项添加到 pom.xml 文件:
Spring Cloud Azure 物料清单 (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies
版本设置为4.19.0
。 此物料清单 (BOM) 应在 pom.xml 文件的<dependencyManagement>
部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure。Spring Cloud Azure Service Bus Integration 项目:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
编写应用程序代码以发送和接收消息
配置服务总线命名空间,如以下示例所示:
spring.cloud.azure.servicebus.namespace=<service-bus-namespace-name>
创建消息接收方。
若要将应用程序用作事件接收器,请通过指定以下信息来配置输入绑定器:
声明一个定义消息处理逻辑的
Consumer
bean。 例如,以下Consumer
bean 被命名为consume
:@Bean public Consumer<Message<String>> consume() { return message -> { System.out.printf("New message received: '%s'.%n", message.getPayload()); }; }
通过替换
<service-bus-queue-name>
占位符,添加配置以指定要使用的queue
名称,如以下示例所示:# name for the `Consumer` bean spring.cloud.function.definition=consume spring.cloud.stream.bindings.consume-in-0.destination=<service-bus-queue-name>
注意
若要从服务总线订阅使用,请务必更改
consume-in-0
绑定属性,如以下示例所示:spring.cloud.stream.bindings.consume-in-0.destination=<service-bus-topic-name> spring.cloud.stream.bindings.consume-in-0.group=<service-bus-subscription-name>
创建消息发送方。
若要将应用程序用作事件源,请通过指定以下信息来配置输出绑定器:
定义一个
Supplier
bean,用于定义消息在应用程序中的来源。@Bean return () -> { System.out.println("Sending a message."); return MessageBuilder.withPayload("Hello world").build(); }; }
通过替换以下示例中的
<your-servicebus-queue-name>
占位符,添加配置以指定要发送的queue
名称:# "consume" is added from the previous step spring.cloud.function.definition=consume;supply spring.cloud.stream.bindings.supply-out-0.destination=<your-servicebus-queue-name> spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
注意
若要发送到服务总线主题,请务必将
entity-type
更改为topic
。
启动应用程序。 你将看到与以下示例类似的日志:
Sending a message. New message received: 'Hello world'.
部署到 Azure Spring Apps
现在,你已在本地运行 Spring Boot 应用程序,是时候将其转移到生产环境了。 借助 Azure Spring Apps,可以轻松地将 Spring Boot 应用程序部署到 Azure,不需更改任何代码。 该服务管理 Spring 应用程序的基础结构,让开发人员可以专注于代码。 Azure Spring Apps 可以通过以下方法提供生命周期管理:综合性监视和诊断、配置管理、服务发现、CI/CD 集成、蓝绿部署等。 若要将应用程序部署到 Azure Spring Apps,请参阅在 Azure Spring Apps 中部署你的第一个应用程序。
后续步骤
另请参阅
有关可用于 Microsoft Azure 的更多 Spring Boot Starters 的更多信息,请参阅什么是 Spring Cloud Azure?