你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于 Java 的Azure 服务总线客户端库 - 版本 7.14.4
Microsoft Azure 服务总线是一种完全托管的企业集成消息中转站。 服务总线可以分离应用程序和服务。 服务总线为异步传输数据和状态提供可靠且安全的平台。 数据通过消息在不同的应用程序和服务之间传输。 如果想要了解有关Azure 服务总线的详细信息,请查看:什么是服务总线
Azure 服务总线客户端库允许发送和接收Azure 服务总线消息,并可用于:
- 传输业务数据,例如销售或采购订单、日志或库存变动。
- 分离应用程序以提高应用程序和服务的可靠性和可伸缩性。 客户端和服务不必同时联机。
- 启用发布者和订阅者之间的 1:n 关系。
- 实现要求消息排序或消息延迟的工作流。
源代码 | API 参考文档 | 产品文档 | 样品 | 包 (Maven)
入门
先决条件
- Java 开发工具包 (版本 8 或更高版本的 JDK)
- Maven
- Microsoft Azure 订阅
- 可以在以下位置创建免费帐户: https://azure.microsoft.com
- Azure 服务总线 实例
- 使用 Azure 门户创建服务总线实例的分步指南
若要在 Azure 中快速创建所需的服务总线资源并接收它们的连接字符串,可以通过单击以下命令部署示例模板:
添加包
包括 BOM 文件
请将 azure-sdk-bom 包含在项目中,以依赖于库的正式发布 (GA) 版本。 在以下代码段中,将 {bom_version_to_target} 占位符替换为版本号。 若要详细了解 BOM,请参阅 AZURE SDK BOM 自述文件。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>{bom_version_to_target}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
然后在没有版本标记的依赖项部分中包含直接依赖项。
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
</dependency>
</dependencies>
包括直接依赖项
如果要依赖于 BOM 中不存在的特定版本的库,请将直接依赖项添加到项目中,如下所示。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.14.4</version>
</dependency>
验证客户端
若要使服务总线客户端库与服务总线交互,需要了解如何与其连接和授权。
使用连接字符串创建服务总线客户端
进行身份验证的最简单方法是使用连接字符串,该字符串在创建服务总线命名空间时自动创建。 如果不熟悉 Azure 中的共享访问策略,建议按照分步指南 获取服务总线连接字符串。
异步和同步服务总线发送方和接收方客户端均使用 ServiceBusClientBuilder
实例化。 以下代码片段分别创建同步服务总线发送方和异步接收方。
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sender()
.queueName("<< QUEUE NAME >>")
.buildClient();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.receiver()
.topicName("<< TOPIC NAME >>")
.subscriptionName("<< SUBSCRIPTION NAME >>")
.buildAsyncClient();
使用 Microsoft 标识平台 (以前的 Azure Active Directory) 创建服务总线客户端
Azure SDK for Java 支持 Azure 标识包,因此可以轻松地从Microsoft 标识平台获取凭据。 首先,添加包:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.10.1</version>
</dependency>
- 已知问题:pom.xml 文件应在客户端库之前
azure-identity
列出azure-messaging-servicebus
。 此问题已使用azure-identity:1.2.1
解决。 查看此处了解更多详细信息。
请求凭据的实现方式位于 包下 com.azure.identity.credential
。 下面的示例演示如何使用 Azure Active Directory (AAD) 应用程序客户端密码通过 Azure 服务总线 授权。
使用 DefaultAzureCredential 授权
使用 DefaultAzureCredential 进行授权最简单。 它查找要在其运行环境中使用的最佳凭据。 有关将 Azure Active Directory 授权与服务总线配合使用的详细信息,请参阅 相关文档。
使用返回的令牌凭据对客户端进行身份验证:
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.credential("<<fully-qualified-namespace>>", credential)
.receiver()
.queueName("<<queue-name>>")
.buildAsyncClient();
关键概念
可以与服务总线命名空间中的主要资源类型进行交互,其中可以存在多个资源类型,并且实际消息传输发生在哪个资源类型上。 命名空间通常用作应用程序容器:
- 队列允许发送和接收先入先出排序的消息。它通常用于点到点通信。
- 主题更适合发布者和订阅者方案。 主题将消息发布到订阅,其中多个消息可以同时存在。
- 订阅接收来自主题的消息。 每个订阅都是独立的,并接收发送到主题的消息的副本。
服务总线客户端
生成器 ServiceBusClientBuilder
用于创建所有服务总线客户端。
ServiceBusSenderClient
负责发送到ServiceBusMessage
Azure 服务总线上的特定队列或主题的同步发送方。ServiceBusSenderAsyncClient
负责发送到ServiceBusMessage
Azure 服务总线上的特定队列或主题的异步发送方。ServiceBusReceiverClient
负责从Azure 服务总线上的特定队列或主题接收ServiceBusMessage
的同步接收方。ServiceBusReceiverAsyncClient
负责从Azure 服务总线上的特定队列或主题接收ServiceBusMessage
的异步接收方。
示例
发送消息
需要创建异步 ServiceBusSenderAsyncClient
或同步 ServiceBusSenderClient
来发送消息。 每个发送方都可以将消息发送到队列或主题。
以下代码片段创建一个同步 ServiceBusSenderClient
,用于将消息发布到队列。
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sender()
.queueName("<< QUEUE NAME >>")
.buildClient();
List<ServiceBusMessage> messages = Arrays.asList(
new ServiceBusMessage("Hello world").setMessageId("1"),
new ServiceBusMessage("Bonjour").setMessageId("2"));
sender.sendMessages(messages);
// When you are done using the sender, dispose of it.
sender.close();
接收消息
若要接收消息,需要创建一个 ServiceBusProcessorClient
,其中包含传入消息的回调以及进程中发生的任何错误。 然后,可以根据需要启动和停止客户端。
在 PeekLock 模式下接收消息时,它会告知中转站应用程序逻辑要确定 (例如完成、放弃) 显式接收的消息。
// Sample code that processes a single message which is received in PeekLock mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (Exception completionError) {
System.out.printf("Completion of the message %s failed\n", message.getMessageId());
completionError.printStackTrace();
}
} else {
try {
context.abandon();
} catch (Exception abandonError) {
System.out.printf("Abandoning of the message %s failed\n", message.getMessageId());
abandonError.printStackTrace();
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
System.err.println("Error occurred while receiving message: " + errorContext.getException());
};
// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.processor()
.queueName("<< QUEUE NAME >>")
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background and returns immediately
processorClient.start();
使用 ReceiveAndDelete 模式接收消息时,告知中转站将其发送到接收客户端的所有消息视为在发送时已解决。
// Sample code that processes a single message which is received in ReceiveAndDelete mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("handler processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
System.err.println("Error occurred while receiving message: " + errorContext.getException());
};
// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.processor()
.queueName("<< QUEUE NAME >>")
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background and returns immediately
processorClient.start();
有四种方法可以使用传递到回调的消息上下文中的方法解决消息。
- 完成 - 导致消息从队列或主题中删除。
- 放弃 - 释放接收方对消息的锁定,允许其他接收方接收消息。
- 延迟 - 通过正常方式延迟接收消息。 若要接收延迟的消息,需要保留消息的序列号。
- 死信 - 将消息移动到 死信队列。 这将阻止再次接收消息。 若要从死信队列接收消息,需要一个作用域为死信队列的接收方。
从已启用会话的队列或主题发送和接收
使用会话需要创建启用了会话的队列或订阅。 可以在“消息会话”中详细了解如何配置此功能。
使用 Azure 服务总线会话,可以连贯有序的方式处理一系列无限多的相关消息。 会话可以在先进先出 (FIFO) 和请求-响应模式下使用。 任何发送方都可以在将消息提交到主题或队列时创建会话,方法是将 ServiceBusMessage.setSessionId(String)
属性设置为会话唯一的某个应用程序定义的标识符。
与未启用会话的队列或订阅不同,只有单个接收方可以随时从会话中读取数据。 当接收方提取会话时,服务总线会锁定该接收方的会话,并且它对该会话中的消息具有独占访问权限。
向会话发送消息
ServiceBusSenderClient
为已启用会话的队列或主题订阅创建 。 设置 ServiceBusMessage.setSessionId(String)
会将 ServiceBusMessage
消息发布到该会话。 如果该会话不存在,则会创建该会话。
// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
.setSessionId("greetings");
sender.sendMessage(message);
从会话接收消息
从会话接收消息类似于从启用非会话的队列或订阅接收消息。 区别在于生成器和使用的类。
在非会话情况下,将使用子生成器 processor()
。 对于会话,请使用子生成器 sessionProcessor()
。 两个子生成器都将创建配置为在会话或非会话服务总线实体上运行的 实例 ServiceBusProcessorClient
。 对于会话处理器,也可以传递希望处理器同时处理的最大会话数。
创建死信队列接收方
Azure 服务总线队列和主题订阅提供辅助子队列,称为死信队列 (DLQ) 。 死信队列不需要显式创建,并且不能删除或以其他方式独立于主实体进行管理。 对于启用会话或非会话队列或主题订阅,可以按如下所示的相同方式创建死信接收器。 在此处详细了解死信队列。
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.receiver() // Use this for session or non-session enabled queue or topic/subscriptions
.topicName("<< TOPIC NAME >>")
.subscriptionName("<< SUBSCRIPTION NAME >>")
.subQueue(SubQueue.DEAD_LETTER_QUEUE)
.buildClient();
在客户端之间共享连接
创建与服务总线的物理连接需要资源。 应用程序应共享连接
客户端之间,可通过共享顶级生成器来实现,如下所示。
// Create shared builder.
ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>");
// Create receiver and sender which will share the connection.
ServiceBusReceiverClient receiver = sharedConnectionBuilder
.receiver()
.queueName("<< QUEUE NAME >>")
.buildClient();
ServiceBusSenderClient sender = sharedConnectionBuilder
.sender()
.queueName("<< QUEUE NAME >>")
.buildClient();
何时使用“ServiceBusProcessorClient”。
何时使用“ServiceBusProcessorClient”、“ServiceBusReceiverClient”或 ServiceBusReceiverAsyncClient? 处理器是使用“ServiceBusReceiverAsyncClient”构建的,它提供了一种在“PEEK_LOCK”模式下通过默认自动完成和自动续订消息锁接收消息的便捷方式。 如果应用程序尚未完全移动到异步接收方客户端,并且想要在同步模式下处理消息,则处理器是合适的。
处理器会永久接收消息,因为它会从内部的网络错误中恢复。
对每条消息进行“ServiceBusProcessorClient:processMessage () ”函数调用。 或者,也可以使用“ServiceBusReceiverClient”,它是一个较低级别的客户端,提供更广泛的 API。 如果异步处理为
适用于应用程序,可以使用“ServiceBusReceiverAsyncClient”。
故障排除
启用客户端日志记录
Azure SDK for Java 提供一致的日志记录案例,以帮助排查应用程序错误并加快其解决速度。 生成的日志会在到达终端状态之前捕获应用程序的流,以帮助查找根本问题。 查看 日志记录 Wiki,获取有关启用日志记录的指南。
启用 AMQP 传输日志记录
如果启用客户端日志记录不足以诊断问题。 可以启用对基础 AMQP 库 Qpid Proton-J 中的文件的日志记录。 Qpid Proton-J 使用 java.util.logging
。 可以通过使用以下内容创建配置文件来启用日志记录。 或者为实现设置 proton.trace.level=ALL
和所需的 java.util.logging.Handler
配置选项。 可在 Java 8 SDK javadoc 中找到实现类及其选项。
若要跟踪 AMQP 传输帧,请设置环境变量: PN_TRACE_FRM=1
。
示例“logging.properties”文件
下面的配置文件将来自 proton-j 的跟踪输出记录到文件“proton-trace.log”。
handlers=java.util.logging.FileHandler
.level=OFF
proton.trace.level=ALL
java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern=proton-trace.log
java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n
常见异常
AMQP 异常
这是 AMQP 相关失败的一般异常,其中包括 AMQP 错误作为 ErrorCondition
,以及导致此异常的上下文为 AmqpErrorContext
。 isTransient
是一个布尔值,指示异常是否为暂时性错误。 如果发生暂时性 AMQP 异常,客户端库会重试该操作的次数,只要 AmqpRetryOptions 允许。 之后,操作会失败,异常将传播回用户。
AmqpErrorCondition
包含 AMQP 协议通用且由 Azure 服务使用的错误条件。 引发 AMQP 异常时,检查错误条件字段可以告知开发人员 AMQP 异常发生的原因,以及如何在可能的情况下缓解此异常。 可在 OASIS AMQP 版本 1.0 传输错误中找到所有 AMQP 异常的列表。
解决 AMQP 异常表示的特定异常的建议方法是遵循 服务总线消息传送异常 指南。
了解 API 行为
此处的文档提供有关使用同步 receiveMessages
API 获取多个消息时的预期行为, (隐式预提取) 。
后续步骤
除了讨论的内容之外,Azure 服务总线客户端库还提供对许多其他方案的支持,以帮助利用 Azure 服务总线 服务的完整功能集。 为了帮助探索其中一些方案, 此处提供了以下一组示例。
贡献
如果你想成为此项目的活动参与者,请参阅我们的贡献指南了解详细信息。