你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

ServiceBusReceiverClient 类

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusReceiverClient

实现

public final class ServiceBusReceiverClient
implements AutoCloseable

负责从Azure 服务总线上的队列或主题/订阅接收ServiceBusReceivedMessage同步接收方。

本文档中显示的示例使用名为 DefaultAzureCredential 的凭据对象进行身份验证,该对象适用于大多数方案,包括本地开发和生产环境。 此外,我们建议使用 托管标识 在生产环境中进行身份验证。 可以在 Azure 标识文档中找到有关不同身份验证方式及其相应凭据类型的详细信息。

示例:创建接收方并接收消息

下面的代码示例演示如何创建和使用同步客户端 ServiceBusReceiverClient 来接收来自服务总线订阅的消息。 当收到 10 条消息或经过 30 秒后,接收操作将返回。 默认情况下,使用 接收 PEEK_LOCK 消息,客户必须使用接收方客户端上的一种结算方法结算其消息。 “Settling receive operations”提供有关消息结算的其他信息。

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .receiver()
     .topicName(topicName)
     .subscriptionName(subscriptionName)
     .buildClient();

 // Receives a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever
 // happens first.
 IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
 messages.forEach(message -> {
     System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), message.getBody());

     // If able to process message, complete it. Otherwise, abandon it and allow it to be
     // redelivered.
     if (isMessageProcessed) {
         receiver.complete(message);
     } else {
         receiver.abandon(message);
     }
 });

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 receiver.close();

方法摘要

修饰符和类型 方法和描述
void abandon(ServiceBusReceivedMessage message)

放弃 ServiceBusReceivedMessage

void abandon(ServiceBusReceivedMessage message, AbandonOptions options)

ServiceBusReceivedMessage放弃 并更新消息的属性。

void close()

通过关闭指向服务的基础链接释放使用者。

void commitTransaction(ServiceBusTransactionContext transactionContext)

提交事务及其关联的所有操作。

void complete(ServiceBusReceivedMessage message)

完成 ServiceBusReceivedMessage

void complete(ServiceBusReceivedMessage message, CompleteOptions options)

完成 ServiceBusReceivedMessage

ServiceBusTransactionContext createTransaction()

在服务总线上启动新事务。

void deadLetter(ServiceBusReceivedMessage message)

ServiceBusReceivedMessage将 移到死信子队列。

void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

ServiceBusReceivedMessage使用死信原因、错误说明和/或修改的属性将 移动到死信子队列。

void defer(ServiceBusReceivedMessage message)

ServiceBusReceivedMessage延迟 。

void defer(ServiceBusReceivedMessage message, DeferOptions options)

ServiceBusReceivedMessage使用带有修改的消息属性的锁定令牌来延迟 。

String getEntityPath()

获取此客户端与之交互的服务总线资源。

String getFullyQualifiedNamespace()

获取与连接关联的完全限定的服务总线命名空间。

String getIdentifier()

获取 实例的 ServiceBusReceiverClient标识符。

String getSessionId()

如果此接收方是会话接收器,则获取会话的会话 ID。

byte[] getSessionState()

如果此接收器是会话接收器,则获取会话的状态。

ServiceBusReceivedMessage peekMessage()

在不更改接收方或消息源的状态的情况下读取下一个活动消息。

ServiceBusReceivedMessage peekMessage(long sequenceNumber)

从给定的序列号开始,在不更改接收方或消息源的状态的情况下读取活动消息。

IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages)

在不更改接收方或消息源的状态的情况下读取下一批活动消息。

IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)

从给定的序列号开始,在不更改接收方或消息源的状态的情况下读取下一批活动消息。

ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber)

接收延迟 ServiceBusReceivedMessage的 。

IterableStream<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers)

接收一批延迟 ServiceBusReceivedMessage的 。

IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages)

从服务总线实体接收 的 ServiceBusReceivedMessage 可迭代流。

IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages, Duration maxWaitTime)

从服务总线实体接收 的 ServiceBusReceivedMessage 可迭代流。

OffsetDateTime renewMessageLock(ServiceBusReceivedMessage message)

续订指定消息的锁。

void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer<Throwable> onError)

为具有给定锁的消息启动自动锁定续订。

OffsetDateTime renewSessionLock()

如果此接收器是会话接收器,则设置会话的状态。

void renewSessionLock(Duration maxLockRenewalDuration, Consumer<Throwable> onError)

启动此接收方工作的会话的自动锁定续订。

void rollbackTransaction(ServiceBusTransactionContext transactionContext)

回滚给定的事务及其关联的所有操作。

void setSessionState(byte[] sessionState)

如果此接收器是会话接收器,则设置会话的状态。

方法继承自 java.lang.Object

方法详细信息

abandon

public void abandon(ServiceBusReceivedMessage message)

放弃 ServiceBusReceivedMessage。 这会使消息再次可供处理。 放弃邮件会增加邮件的传递计数。

Parameters:

message - 要 ServiceBusReceivedMessage 执行此操作的 。

abandon

public void abandon(ServiceBusReceivedMessage message, AbandonOptions options)

ServiceBusReceivedMessage放弃 并更新消息的属性。 这会使消息再次可供处理。 放弃邮件会增加邮件的传递计数。

Parameters:

message - 要 ServiceBusReceivedMessage 执行此操作的 。
options - 放弃消息时要设置的选项。

close

public void close()

通过关闭指向服务的基础链接释放使用者。

commitTransaction

public void commitTransaction(ServiceBusTransactionContext transactionContext)

提交事务及其关联的所有操作。

创建和使用事务

ServiceBusTransactionContext transaction = receiver.createTransaction();

 // Process messages and associate operations with the transaction.
 ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
 receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
 receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
 receiver.commitTransaction(transaction);

Parameters:

transactionContext - 要提交的事务。

complete

public void complete(ServiceBusReceivedMessage message)

完成 ServiceBusReceivedMessage。 这会从服务中删除消息。

Parameters:

message - 要 ServiceBusReceivedMessage 执行此操作的 。

complete

public void complete(ServiceBusReceivedMessage message, CompleteOptions options)

完成 ServiceBusReceivedMessage。 这会从服务中删除消息。

Parameters:

message - 要 ServiceBusReceivedMessage 执行此操作的 。
options - 用于完成消息的选项。

createTransaction

public ServiceBusTransactionContext createTransaction()

在服务总线上启动新事务。 ServiceBusTransactionContext应将 传递给需要在此事务中的所有操作。

示例:创建和使用事务

ServiceBusTransactionContext transaction = receiver.createTransaction();

 // Process messages and associate operations with the transaction.
 ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
 receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
 receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
 receiver.commitTransaction(transaction);

Returns:

deadLetter

public void deadLetter(ServiceBusReceivedMessage message)

ServiceBusReceivedMessage将 移到死信子队列。

Parameters:

message - 要 ServiceBusReceivedMessage 执行此操作的 。

deadLetter

public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

ServiceBusReceivedMessage使用死信原因、错误说明和/或修改的属性将 移动到死信子队列。

Parameters:

message - 要 ServiceBusReceivedMessage 执行此操作的 。
options - 用于对消息进行死信的选项。

defer

public void defer(ServiceBusReceivedMessage message)

ServiceBusReceivedMessage延迟 。 这会将消息移动到延迟的子队列中。

Parameters:

message - 要 ServiceBusReceivedMessage 执行此操作的 。

defer

public void defer(ServiceBusReceivedMessage message, DeferOptions options)

ServiceBusReceivedMessage使用带有修改的消息属性的锁定令牌来延迟 。 这会将消息移动到延迟的子队列中。

Parameters:

message - 要 ServiceBusReceivedMessage 执行此操作的 。
options - 用于延迟消息的选项。

getEntityPath

public String getEntityPath()

获取此客户端与之交互的服务总线资源。

Returns:

此客户端与之交互的服务总线资源。

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

获取与连接关联的完全限定的服务总线命名空间。 这可能类似于 {yournamespace}.servicebus.windows.net

Returns:

与连接关联的完全限定的服务总线命名空间。

getIdentifier

public String getIdentifier()

获取 实例的 ServiceBusReceiverClient标识符。

Returns:

可以标识 实例的 ServiceBusReceiverClient标识符。

getSessionId

public String getSessionId()

如果此接收方是会话接收器,则获取会话的 SessionId。

Returns:

如果这不是会话接收器,则为 SessionId 或 null。

getSessionState

public byte[] getSessionState()

如果此接收器是会话接收器,则获取会话的状态。

Returns:

会话状态或 null(如果未为会话设置状态)。

peekMessage

public ServiceBusReceivedMessage peekMessage()

在不更改接收方或消息源的状态的情况下读取下一个活动消息。 对 的第一次调用 peekMessage() 将提取此接收方的第一条活动消息。 每个后续调用都会提取实体中的后续消息。

Returns:

一个扫视的 ServiceBusReceivedMessage

peekMessage

public ServiceBusReceivedMessage peekMessage(long sequenceNumber)

从给定的序列号开始,在不更改接收方或消息源的状态的情况下读取活动消息。

Parameters:

sequenceNumber - 从中读取消息的序列号。

Returns:

一个扫视的 ServiceBusReceivedMessage

peekMessages

public IterableStream peekMessages(int maxMessages)

在不更改接收方或消息源的状态的情况下读取下一批活动消息。

Parameters:

maxMessages - 要查看的最大消息数。

Returns:

peekMessages

public IterableStream peekMessages(int maxMessages, long sequenceNumber)

从给定的序列号开始,在不更改接收方或消息源的状态的情况下读取下一批活动消息。

Parameters:

maxMessages - 消息数。
sequenceNumber - 从何处开始读取消息的序列号。

Returns:

receiveDeferredMessage

public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber)

接收延迟 ServiceBusReceivedMessage的 。 只能使用序列号接收延迟的消息。

Parameters:

sequenceNumber - 消息的 getSequenceNumber()

Returns:

具有匹配 sequenceNumber的延迟消息。

receiveDeferredMessageBatch

public IterableStream receiveDeferredMessageBatch(Iterable sequenceNumbers)

接收一批延迟 ServiceBusReceivedMessage的 。 只能使用序列号接收延迟的消息。

Parameters:

sequenceNumbers - 延迟消息的序列号。

Returns:

receiveMessages

public IterableStream receiveMessages(int maxMessages)

从服务总线实体接收 的 ServiceBusReceivedMessage 可迭代流。 接收操作将等待默认 1 分钟才能收到消息,然后才超时。可以使用 替代它 receiveMessages(int maxMessages, Duration maxWaitTime)

客户端使用下面的 AMQP 链接来接收消息;如果当前链接遇到可重试的错误,则客户端将以透明方式转换为新的 AMQP 链接。 当客户端遇到不可重试的错误或重试次数耗尽时,迭代 (例如,forEach) IterableStream<T> 由 receiveMessages API 的进一步调用返回的 会将错误引发到应用程序。 应用程序收到此错误后,应用程序应重置客户端,即关闭当前 ServiceBusReceiverClient 客户端并创建新客户端以继续接收消息。

注意:不可重试错误的一些示例包括 - 应用程序尝试连接到不存在的队列,在接收时删除或禁用队列,用户显式启动异地 DR。 这些是服务总线向客户端传达发生不可重试错误的某些事件。

Parameters:

maxMessages - 要接收的最大消息数。

Returns:

IterableStream<T>来自服务总线实体的最多maxMessages消息的 。

receiveMessages

public IterableStream receiveMessages(int maxMessages, Duration maxWaitTime)

从服务总线实体接收 的 ServiceBusReceivedMessage 可迭代流。 默认接收模式为 ,PEEK_LOCK除非在使用 ServiceBusReceiverClientBuilder#receiveMode(ServiceBusReceiveMode)创建 ServiceBusReceiverClient 期间更改了它。

客户端使用下面的 AMQP 链接来接收消息;如果当前链接遇到可重试的错误,则客户端将以透明方式转换为新的 AMQP 链接。 当客户端遇到不可重试的错误或重试次数耗尽时,迭代 (例如,forEach) IterableStream<T> 由 receiveMessages API 的进一步调用返回的 会将错误引发到应用程序。 应用程序收到此错误后,应用程序应重置客户端,即关闭当前 ServiceBusReceiverClient 客户端并创建新客户端以继续接收消息。

注意:不可重试错误的一些示例包括 - 应用程序尝试连接到不存在的队列,在接收时删除或禁用队列,用户显式启动异地 DR。 这些是服务总线向客户端传达发生不可重试错误的某些事件。

Parameters:

maxMessages - 要接收的最大消息数。
maxWaitTime - 客户端在超时之前等待接收消息的时间。

Returns:

IterableStream<T>来自服务总线实体的最多maxMessages消息的 。

renewMessageLock

public OffsetDateTime renewMessageLock(ServiceBusReceivedMessage message)

续订指定消息的锁。 将根据实体上指定的设置续订锁。 在 PEEK_LOCK 模式下收到消息时,此接收方实例在创建队列 (LockDuration) 期间指定的持续时间内,该消息将在此接收方实例的服务器上锁定。 如果消息的处理时间超过此持续时间,则需要续订锁。 对于每次续订,锁将重置为实体的 LockDuration 值。

Parameters:

message - 要 ServiceBusReceivedMessage 执行锁续订的 。

Returns:

消息的新过期时间。

renewMessageLock

public void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer onError)

为具有给定锁的消息启动自动锁定续订。

Parameters:

message - 要 ServiceBusReceivedMessage 执行自动锁定续订的 。
maxLockRenewalDuration - 持续续订锁定令牌的最长持续时间。
onError - 在锁续订期间发生错误时调用的函数。

renewSessionLock

public OffsetDateTime renewSessionLock()

如果此接收器是会话接收器,则设置会话的状态。

Returns:

会话锁的下一个过期时间。

renewSessionLock

public void renewSessionLock(Duration maxLockRenewalDuration, Consumer onError)

启动此接收方工作的会话的自动锁定续订。

Parameters:

maxLockRenewalDuration - 持续续订会话的最长持续时间。
onError - 在锁续订期间发生错误时调用的函数。

rollbackTransaction

public void rollbackTransaction(ServiceBusTransactionContext transactionContext)

回滚给定的事务及其关联的所有操作。

创建和使用事务

ServiceBusTransactionContext transaction = receiver.createTransaction();

 // Process messages and associate operations with the transaction.
 ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
 receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
 receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
 receiver.commitTransaction(transaction);

Parameters:

transactionContext - 要回滚的事务。

setSessionState

public void setSessionState(byte[] sessionState)

如果此接收器是会话接收器,则设置会话的状态。

Parameters:

sessionState - 要对会话设置的状态。

适用于