你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
ServiceBusReceiverClient 类
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusReceiverClient
- com.
实现
public final class ServiceBusReceiverClient
implements AutoCloseable
负责从Azure 服务总线上的队列或主题/订阅接收ServiceBusReceivedMessage的同步接收方。
本文档中显示的示例使用名为 DefaultAzureCredential 的凭据对象进行身份验证,该对象适用于大多数方案,包括本地开发和生产环境。 此外,我们建议使用 托管标识 在生产环境中进行身份验证。 可以在 Azure 标识文档中找到有关不同身份验证方式及其相应凭据类型的详细信息。
示例:创建接收方并接收消息
下面的代码示例演示如何创建和使用同步客户端 ServiceBusReceiverClient 来接收来自服务总线订阅的消息。 当收到 10 条消息或经过 30 秒后,接收操作将返回。 默认情况下,使用 接收 PEEK_LOCK 消息,客户必须使用接收方客户端上的一种结算方法结算其消息。 “Settling receive operations”提供有关消息结算的其他信息。
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.receiver()
.topicName(topicName)
.subscriptionName(subscriptionName)
.buildClient();
// Receives a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever
// happens first.
IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
messages.forEach(message -> {
System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), message.getBody());
// If able to process message, complete it. Otherwise, abandon it and allow it to be
// redelivered.
if (isMessageProcessed) {
receiver.complete(message);
} else {
receiver.abandon(message);
}
});
// When program ends, or you're done receiving all messages, dispose of the receiver.
// Clients should be long-lived objects as they
// require resources and time to establish a connection to the service.
receiver.close();
方法摘要
方法继承自 java.lang.Object
方法详细信息
abandon
public void abandon(ServiceBusReceivedMessage message)
放弃 ServiceBusReceivedMessage。 这会使消息再次可供处理。 放弃邮件会增加邮件的传递计数。
Parameters:
abandon
public void abandon(ServiceBusReceivedMessage message, AbandonOptions options)
ServiceBusReceivedMessage放弃 并更新消息的属性。 这会使消息再次可供处理。 放弃邮件会增加邮件的传递计数。
Parameters:
close
public void close()
通过关闭指向服务的基础链接释放使用者。
commitTransaction
public void commitTransaction(ServiceBusTransactionContext transactionContext)
提交事务及其关联的所有操作。
创建和使用事务
ServiceBusTransactionContext transaction = receiver.createTransaction();
// Process messages and associate operations with the transaction.
ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
receiver.commitTransaction(transaction);
Parameters:
complete
public void complete(ServiceBusReceivedMessage message)
完成 ServiceBusReceivedMessage。 这会从服务中删除消息。
Parameters:
complete
public void complete(ServiceBusReceivedMessage message, CompleteOptions options)
完成 ServiceBusReceivedMessage。 这会从服务中删除消息。
Parameters:
createTransaction
public ServiceBusTransactionContext createTransaction()
在服务总线上启动新事务。 ServiceBusTransactionContext应将 传递给需要在此事务中的所有操作。
示例:创建和使用事务
ServiceBusTransactionContext transaction = receiver.createTransaction();
// Process messages and associate operations with the transaction.
ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
receiver.commitTransaction(transaction);
Returns:
deadLetter
public void deadLetter(ServiceBusReceivedMessage message)
ServiceBusReceivedMessage将 移到死信子队列。
Parameters:
deadLetter
public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)
ServiceBusReceivedMessage使用死信原因、错误说明和/或修改的属性将 移动到死信子队列。
Parameters:
defer
public void defer(ServiceBusReceivedMessage message)
ServiceBusReceivedMessage延迟 。 这会将消息移动到延迟的子队列中。
Parameters:
defer
public void defer(ServiceBusReceivedMessage message, DeferOptions options)
ServiceBusReceivedMessage使用带有修改的消息属性的锁定令牌来延迟 。 这会将消息移动到延迟的子队列中。
Parameters:
getEntityPath
public String getEntityPath()
获取此客户端与之交互的服务总线资源。
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
获取与连接关联的完全限定的服务总线命名空间。 这可能类似于 {yournamespace}.servicebus.windows.net
。
Returns:
getIdentifier
public String getIdentifier()
获取 实例的 ServiceBusReceiverClient标识符。
Returns:
getSessionId
public String getSessionId()
如果此接收方是会话接收器,则获取会话的 SessionId。
Returns:
getSessionState
public byte[] getSessionState()
如果此接收器是会话接收器,则获取会话的状态。
Returns:
peekMessage
public ServiceBusReceivedMessage peekMessage()
在不更改接收方或消息源的状态的情况下读取下一个活动消息。 对 的第一次调用 peekMessage()
将提取此接收方的第一条活动消息。 每个后续调用都会提取实体中的后续消息。
Returns:
peekMessage
public ServiceBusReceivedMessage peekMessage(long sequenceNumber)
从给定的序列号开始,在不更改接收方或消息源的状态的情况下读取活动消息。
Parameters:
Returns:
peekMessages
public IterableStream
在不更改接收方或消息源的状态的情况下读取下一批活动消息。
Parameters:
Returns:
peekMessages
public IterableStream
从给定的序列号开始,在不更改接收方或消息源的状态的情况下读取下一批活动消息。
Parameters:
Returns:
receiveDeferredMessage
public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber)
接收延迟 ServiceBusReceivedMessage的 。 只能使用序列号接收延迟的消息。
Parameters:
Returns:
sequenceNumber
的延迟消息。receiveDeferredMessageBatch
public IterableStream
接收一批延迟 ServiceBusReceivedMessage的 。 只能使用序列号接收延迟的消息。
Parameters:
Returns:
receiveMessages
public IterableStream
从服务总线实体接收 的 ServiceBusReceivedMessage 可迭代流。 接收操作将等待默认 1 分钟才能收到消息,然后才超时。可以使用 替代它 receiveMessages(int maxMessages, Duration maxWaitTime)。
客户端使用下面的 AMQP 链接来接收消息;如果当前链接遇到可重试的错误,则客户端将以透明方式转换为新的 AMQP 链接。 当客户端遇到不可重试的错误或重试次数耗尽时,迭代 (例如,forEach) IterableStream<T> 由 receiveMessages API 的进一步调用返回的 会将错误引发到应用程序。 应用程序收到此错误后,应用程序应重置客户端,即关闭当前 ServiceBusReceiverClient 客户端并创建新客户端以继续接收消息。
注意:不可重试错误的一些示例包括 - 应用程序尝试连接到不存在的队列,在接收时删除或禁用队列,用户显式启动异地 DR。 这些是服务总线向客户端传达发生不可重试错误的某些事件。
Parameters:
Returns:
maxMessages
消息的 。receiveMessages
public IterableStream
从服务总线实体接收 的 ServiceBusReceivedMessage 可迭代流。 默认接收模式为 ,PEEK_LOCK除非在使用 ServiceBusReceiverClientBuilder#receiveMode(ServiceBusReceiveMode)创建 ServiceBusReceiverClient 期间更改了它。
客户端使用下面的 AMQP 链接来接收消息;如果当前链接遇到可重试的错误,则客户端将以透明方式转换为新的 AMQP 链接。 当客户端遇到不可重试的错误或重试次数耗尽时,迭代 (例如,forEach) IterableStream<T> 由 receiveMessages API 的进一步调用返回的 会将错误引发到应用程序。 应用程序收到此错误后,应用程序应重置客户端,即关闭当前 ServiceBusReceiverClient 客户端并创建新客户端以继续接收消息。
注意:不可重试错误的一些示例包括 - 应用程序尝试连接到不存在的队列,在接收时删除或禁用队列,用户显式启动异地 DR。 这些是服务总线向客户端传达发生不可重试错误的某些事件。
Parameters:
Returns:
maxMessages
消息的 。renewMessageLock
public OffsetDateTime renewMessageLock(ServiceBusReceivedMessage message)
续订指定消息的锁。 将根据实体上指定的设置续订锁。 在 PEEK_LOCK 模式下收到消息时,此接收方实例在创建队列 (LockDuration) 期间指定的持续时间内,该消息将在此接收方实例的服务器上锁定。 如果消息的处理时间超过此持续时间,则需要续订锁。 对于每次续订,锁将重置为实体的 LockDuration 值。
Parameters:
Returns:
renewMessageLock
public void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer
为具有给定锁的消息启动自动锁定续订。
Parameters:
renewSessionLock
public OffsetDateTime renewSessionLock()
如果此接收器是会话接收器,则设置会话的状态。
Returns:
renewSessionLock
public void renewSessionLock(Duration maxLockRenewalDuration, Consumer
启动此接收方工作的会话的自动锁定续订。
Parameters:
rollbackTransaction
public void rollbackTransaction(ServiceBusTransactionContext transactionContext)
回滚给定的事务及其关联的所有操作。
创建和使用事务
ServiceBusTransactionContext transaction = receiver.createTransaction();
// Process messages and associate operations with the transaction.
ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
receiver.commitTransaction(transaction);
Parameters:
setSessionState
public void setSessionState(byte[] sessionState)
如果此接收器是会话接收器,则设置会话的状态。
Parameters: