你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
ServiceBusClientBuilder 类
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusClientBuilder
- com.
实现
public final class ServiceBusClientBuilder
implements TokenCredentialTrait<ServiceBusClientBuilder>, AzureNamedKeyCredentialTrait<ServiceBusClientBuilder>, ConnectionStringTrait<ServiceBusClientBuilder>, AzureSasCredentialTrait<ServiceBusClientBuilder>, AmqpTrait<ServiceBusClientBuilder>, ConfigurationTrait<ServiceBusClientBuilder>
此类提供 Fluent 生成器 API,用于帮助实例化客户端,以便向/从服务总线实体发送和接收消息。
需要凭据才能对Azure 服务总线执行操作。 可以使用以下方法之一设置它们:
- connectionString(String connectionString)与服务总线命名空间连接字符串。
- credential(String fullyQualifiedNamespace, TokenCredential credential)、 credential(String fullyQualifiedNamespace, AzureSasCredential credential)和 credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential) 重载可与有权访问完全限定的服务总线命名空间的相应凭据一起使用。
- credential(TokenCredential credential)、 credential(AzureSasCredential credential)和 credential(AzureNamedKeyCredential credential) 重载可以与其各自的凭据一起使用。 fullyQualifiedNamespace(String fullyQualifiedNamespace)必须 设置。
以下示例 DefaultAzureCredential
中使用的凭据用于身份验证。 它适用于大多数方案,包括本地开发和生产环境。 此外,我们建议使用 托管标识 在生产环境中进行身份验证。 可以在 Azure 标识文档中找到有关不同身份验证方式及其相应凭据类型的详细信息。
客户端和子生成器
ServiceBusClientBuilder 可以实例化多个客户端。 要实例化的客户端取决于用户是在发布还是接收消息,以及实体是否启用了 服务总线会话 。
- 发送消息:使用 sender() 子生成器创建 ServiceBusSenderAsyncClient 和 ServiceBusSenderClient。
- 接收消息:使用 receiver() 子生成器创建 ServiceBusReceiverAsyncClient 和 ServiceBusReceiverAsyncClient。
- 从已启用会话的服务总线实体接收消息:使用 sessionReceiver() 子生成器创建 ServiceBusSessionReceiverAsyncClient 和 ServiceBusSessionReceiverClient。
- 使用基于回调的处理器接收消息:使用 processor() 子生成器创建 ServiceBusProcessorClient。
- 使用基于回调的处理器从已启用会话的服务总线实体接收消息 :使用 sessionProcessor() 子生成器创建 ServiceBusProcessorClient。
发送消息
示例:实例化同步发件人并发送消息
下面的代码示例演示如何创建同步客户端 ServiceBusSenderClient 并发送消息。 fullyQualifiedNamespace
是服务总线命名空间的主机名。 通过 Azure 门户导航到服务总线命名空间后,它列在“概要”面板下。 使用的凭据是 DefaultAzureCredential
因为它合并了部署和开发中常用的凭据,并根据其运行环境选择要使用的凭据。 当性能很重要时,请考虑使用 ServiceBusMessageBatch 一次发布多条消息。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sender()
.queueName(queueName)
.buildClient();
sender.sendMessage(new ServiceBusMessage("Foo bar"));
使用消息
有多个客户端使用来自服务总线实体 (的消息,这些客户端未) 启用 服务总线会话 。
示例:实例化异步接收器
下面的代码示例演示如何创建异步接收器。 使用的 DefaultAzureCredential
凭据用于身份验证。 它适用于大多数方案,包括本地开发和生产环境。 PEEK_LOCK强烈建议使用 和 disableAutoComplete() ,以便用户可以控制消息解决。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.disableAutoComplete()
.queueName(queueName)
.buildAsyncClient();
// When users are done with the receiver, dispose of the receiver.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncReceiver.close();
示例:实例化 ServiceBusProcessorClient
下面的代码示例演示如何创建处理器客户端。 对于大多数生产方案,建议使用处理器客户端,因为它提供连接恢复。 使用的 DefaultAzureCredential
凭据用于身份验证。 它适用于大多数方案,包括本地开发和生产环境。 PEEK_LOCK强烈建议使用 和 disableAutoComplete() ,以便用户可以控制消息解决。
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
使用来自已启用会话的服务总线实体的消息
服务总线支持通过 服务总线会话对无限的消息序列进行联合和有序处理。 会话可用作先入先出 (FIFO) 消息处理。 队列和主题/订阅支持服务总线会话,但是,必须在 创建实体时启用它。
示例:向已启用会话的队列发送消息
以下代码片段演示如何向已启用 服务总线会话的 队列发送消息。 将属性设置为 setMessageId(String messageId) “greetings”会将消息发送到 ID 为“greetings”的服务总线会话。
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, new DefaultAzureCredentialBuilder().build())
.sender()
.queueName(sessionEnabledQueueName)
.buildClient();
// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
.setSessionId("greetings");
sender.sendMessage(message);
// Dispose of the sender.
sender.close();
示例:从第一个可用会话接收消息
若要处理来自第一个可用会话的消息,请切换到 ServiceBusSessionReceiverClientBuilder 并生成会话接收方客户端。 使用 acceptNextSession() 查找要处理消息的第一个可用会话。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
示例:处理来自所有会话的消息
下面的代码示例演示如何创建 ServiceBusProcessorClient 处理队列中所有可用会话的 。 ServiceBusSessionProcessorClientBuilder#maxConcurrentSessions(int) 指示处理器将同时处理的会话数。 使用的凭据用于 DefaultAzureCredential
身份验证。 它适用于大多数方案,包括本地开发和生产环境。 PEEK_LOCK强烈建议和 disableAutoComplete() ,以便用户可以控制消息解决。
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
Consumer<ServiceBusErrorContext> onError = context -> {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (context.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) context.getException();
System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", context.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
.queueName(sessionEnabledQueueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
连接共享
创建与服务总线的连接需要资源。 如果体系结构允许,应用程序应在客户端之间共享连接,这可以通过共享顶级生成器来实现,如下所示。
在客户端之间共享连接
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// Any clients created from this builder will share the underlying connection.
ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential);
// Create receiver and sender which will share the connection.
ServiceBusReceiverClient receiver = sharedConnectionBuilder
.receiver()
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.queueName(queueName)
.buildClient();
ServiceBusSenderClient sender = sharedConnectionBuilder
.sender()
.queueName(queueName)
.buildClient();
// Use the clients and finally close them.
try {
sender.sendMessage(new ServiceBusMessage("payload"));
receiver.receiveMessages(1);
} finally {
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
sender.close();
receiver.close();
}
构造函数摘要
构造函数 | 说明 |
---|---|
ServiceBusClientBuilder() |
使用默认传输 AMQP创建新实例。 |
方法摘要
方法继承自 java.lang.Object
构造函数详细信息
ServiceBusClientBuilder
public ServiceBusClientBuilder()
使用默认传输 AMQP创建新实例。
方法详细信息
clientOptions
public ServiceBusClientBuilder clientOptions(ClientOptions clientOptions)
ClientOptions设置要从此生成器生成的客户端发送的 ,启用某些属性的自定义,并支持添加自定义标头信息。 有关详细信息, ClientOptions 请参阅文档。
Parameters:
Returns:
configuration
public ServiceBusClientBuilder configuration(Configuration configuration)
设置在构造服务客户端期间使用的配置存储。 如果未指定,则使用默认配置存储来配置服务总线客户端。 用于 NONE 在构造过程中绕过配置设置。
Parameters:
Returns:
connectionString
public ServiceBusClientBuilder connectionString(String connectionString)
设置服务总线命名空间或特定服务总线资源的连接字符串。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(AzureNamedKeyCredential credential)
使用服务总线资源的共享访问策略设置凭据。 可以在 Azure 门户或 Azure CLI 上找到共享访问策略。 例如,在门户上,“共享访问策略”具有“policy”及其“主密钥”和“辅助密钥”。 的“name”属性 AzureNamedKeyCredential 是门户上的“策略”,“key”属性可以是“主键”或“辅助密钥”。 此方法采用 connectionString(String connectionString) 不同形式相同的信息。 但它允许你更新名称和密钥。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(AzureSasCredential credential)
使用服务总线资源的共享访问签名设置凭据。 请参阅 使用共享访问签名的服务总线访问控制。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(TokenCredential credential)
TokenCredential设置用于授权发送到服务的请求的 。 有关正确使用TokenCredential该类型的更多详细信息,请参阅适用于 Java 的 Azure SDK 标识和身份验证文档。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential)
使用服务总线资源的共享访问策略设置凭据。 可以在 Azure 门户或 Azure CLI 上找到共享访问策略。 例如,在门户上,“共享访问策略”具有“policy”及其“主密钥”和“辅助密钥”。 的“name”属性 AzureNamedKeyCredential 是门户上的“策略”,“key”属性可以是“主键”或“辅助密钥”。 此方法采用 connectionString(String connectionString) 不同形式相同的信息。 但它允许你更新名称和密钥。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureSasCredential credential)
使用服务总线资源的共享访问签名设置凭据。 请参阅 使用共享访问签名的服务总线访问控制。
Parameters:
Returns:
credential
public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, TokenCredential credential)
通过使用 TokenCredential 服务总线资源的 来设置凭据。 azure-identity 具有多个 TokenCredential 实现,可用于对服务总线资源的访问权限进行身份验证。
Parameters:
Returns:
customEndpointAddress
public ServiceBusClientBuilder customEndpointAddress(String customEndpointAddress)
在连接到服务总线服务时设置自定义终结点地址。 当网络不允许连接到标准Azure 服务总线终结点地址,但允许通过中介进行连接时,这非常有用。 例如:https://my.custom.endpoint.com:55300。
如果未指定端口,则使用 的默认 transportType(AmqpTransportType transportType) 端口。
Parameters:
Returns:
enableCrossEntityTransactions
public ServiceBusClientBuilder enableCrossEntityTransactions()
在与服务总线的连接上启用跨实体事务。 仅当事务范围跨越不同的服务总线实体时,才使用此功能。 此功能是通过服务器端的一个“发送方式”实体路由所有消息来实现的,如下所述。 为多个实体创建客户端后,执行操作的第一个实体将成为实体,所有后续发送将通过 (“send-via”实体) 路由。 这使服务能够执行旨在跨多个实体的事务。 这意味着执行其第一个操作的后续实体需要是发送方,或者如果它们是接收方,则它们需要与通过 (路由所有发送的初始实体位于同一实体上,否则服务将无法确保提交事务,因为它无法通过不同的实体) 路由接收操作。 例如,如果你有 SenderA (for entity A) 和 ReceiverB (For entity B) (从启用了跨实体事务的客户端创建),则需要先接收 ReceiverB 才能使此功能正常工作。 如果首先发送到实体 A,然后尝试从实体 B 接收,则会引发异常。
避免在此客户端上使用非事务 API
由于此功能将设置与经过优化以启用此功能的服务总线的连接。 设置所有客户端后,使用的第一个接收方或发送方会将“send-via”队列初始化为单个消息传输实体。 所有消息都将通过此队列流动。 因此,此客户端不适合任何非事务 API。
何时不启用此功能
如果事务仅涉及一个服务总线实体。 例如,你从一个队列/订阅接收,并且你想要解决属于一个事务的一部分的你自己的消息。
Returns:
fullyQualifiedNamespace
public ServiceBusClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)
设置服务总线的完全限定命名空间。
Parameters:
Returns:
processor
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processor()
用于配置ServiceBusProcessorClient实例ServiceBusProcessorClientBuilder的 的新实例。
Returns:
proxyOptions
public ServiceBusClientBuilder proxyOptions(ProxyOptions proxyOptions)
设置要用于 的 ServiceBusSenderAsyncClient代理配置。 配置代理后, AMQP_WEB_SOCKETS 必须用于传输类型。
Parameters:
Returns:
receiver
public ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiver()
用于配置服务总线消息接收器的 的新实例 ServiceBusReceiverClientBuilder 。
Returns:
retryOptions
public ServiceBusClientBuilder retryOptions(AmqpRetryOptions retryOptions)
设置服务总线客户端的重试选项。 如果未指定,则使用默认重试选项。
Parameters:
Returns:
ruleManager
public ServiceBusClientBuilder.ServiceBusRuleManagerBuilder ruleManager()
用于配置服务总线规则管理器实例 ServiceBusRuleManagerBuilder 的新实例。
Returns:
sender
public ServiceBusClientBuilder.ServiceBusSenderClientBuilder sender()
用于配置服务总线消息发送方的新实例 ServiceBusSenderClientBuilder 。
Returns:
sessionProcessor
public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder sessionProcessor()
用于配置处理会话的服务总线处理器实例的新 ServiceBusSessionProcessorClientBuilder 实例。
Returns:
sessionReceiver
public ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiver()
用于配置会话感知服务总线消息接收器的新实例ServiceBusSessionReceiverClientBuilder。
Returns:
transportType
public ServiceBusClientBuilder transportType(AmqpTransportType transportType)
设置与Azure 服务总线通信所依据的传输类型。 默认值为 AMQP。
Parameters:
Returns: