你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
ServiceBusReceiverAsyncClient 类
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusReceiverAsyncClient
- com.
实现
public final class ServiceBusReceiverAsyncClient
implements AutoCloseable
负责从Azure 服务总线队列或主题/订阅接收ServiceBusReceivedMessage的异步接收方。
本文档中显示的示例使用名为 DefaultAzureCredential 的凭据对象进行身份验证,该对象适用于大多数方案,包括本地开发和生产环境。 此外,我们建议使用 托管标识 在生产环境中进行身份验证。 可以在 Azure 标识文档中找到有关不同身份验证方式及其相应凭据类型的详细信息。
示例:创建 ServiceBusReceiverAsyncClient
以下代码示例演示如何创建异步客户端 ServiceBusReceiverAsyncClient。 fullyQualifiedNamespace
是服务总线命名空间的主机名。 通过 Azure 门户导航到事件中心命名空间后,它列在“概要”面板下。 使用的凭据是 DefaultAzureCredential
因为它合并了部署和开发中常用的凭据,并根据其运行环境选择要使用的凭据。 PEEK_LOCK (默认接收模式) ,disableAutoComplete()强烈建议用户控制消息解决。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.disableAutoComplete()
.queueName(queueName)
.buildAsyncClient();
// When users are done with the receiver, dispose of the receiver.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncReceiver.close();
示例:从服务总线资源接收所有消息
这会从服务总线返回无限的消息流。 当释放订阅或其他终端方案时,流结束。 有关详细信息,请参阅receiveMessages()。
// Keep a reference to `subscription`. When the program is finished receiving messages, call
// subscription.dispose(). This will stop fetching messages from the Service Bus.
// Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
Disposable subscription = asyncReceiver.receiveMessages()
.flatMap(message -> {
System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
System.out.printf("Contents of message as string: %s%n", message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return asyncReceiver.complete(message);
} else {
return asyncReceiver.abandon(message);
}
})
.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
// When program ends, or you're done receiving all messages, dispose of the receiver.
// Clients should be long-lived objects as they
// require resources and time to establish a connection to the service.
asyncReceiver.close();
示例:从服务总线实体以模式 RECEIVE_AND_DELETE 接收消息
下面的代码示例演示如何使用 RECEIVE_AND_DELETE创建异步客户端ServiceBusReceiverAsyncClient。 fullyQualifiedNamespace
是服务总线命名空间的主机名。 通过 Azure 门户导航到事件中心命名空间后,它列在“概要”面板下。 使用的凭据是 DefaultAzureCredential
因为它合并了部署和开发中常用的凭据,并根据其运行环境选择要使用的凭据。 有关使用此模式接收消息的详细信息,请参阅 RECEIVE_AND_DELETE 文档。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// Keep a reference to `subscription`. When the program is finished receiving messages, call
// subscription.dispose(). This will stop fetching messages from the Service Bus.
Disposable subscription = Flux.usingWhen(
Mono.fromCallable(() -> {
// Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
// peek lock mode is used. In peek lock mode, users are responsible for settling messages.
return new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.queueName(queueName)
.buildAsyncClient();
}), receiver -> {
return receiver.receiveMessages();
}, receiver -> {
return Mono.fromRunnable(() -> receiver.close());
})
.subscribe(message -> {
// Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
// removed from the queue.
System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
System.out.printf("Contents of message as string: %s%n", message.getBody());
},
error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
示例:从特定会话接收消息
若要从特定会话提取消息,请切换到 ServiceBusSessionReceiverClientBuilder 并生成会话接收方客户端。 使用 acceptSession(String sessionId) 创建会话绑定 ServiceBusReceiverAsyncClient的 。 此示例假定在 创建队列时启用了服务总线会话。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
// successfully locked.
// `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
// operations complete.
// `Mono.usingWhen` can also be used if the resource closure returns a single item.
Flux<Void> sessionMessages = Flux.usingWhen(
sessionReceiver.acceptSession("<<my-session-id>>"),
receiver -> {
// Receive messages from <<my-session-id>> session.
return receiver.receiveMessages().flatMap(message -> {
System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
});
},
receiver -> Mono.fromRunnable(() -> {
// Dispose of resources.
receiver.close();
sessionReceiver.close();
}));
// When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
// is non-blocking and kicks off the operation.
Disposable subscription = sessionMessages.subscribe(
unused -> {
}, error -> System.err.print("Error receiving message from session: " + error),
() -> System.out.println("Completed receiving from session."));
示例:从第一个可用会话接收消息
若要处理来自第一个可用会话的消息,请切换到 ServiceBusSessionReceiverClientBuilder 并生成会话接收方客户端。 使用 acceptNextSession() 查找要处理消息的第一个可用会话。
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
示例:限制来自服务总线实体的消息的速率消耗
对于需要限制在给定时间接收的消息数的消息接收者,可以使用 BaseSubscriber#request(long)。
// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
private static final int NUMBER_OF_MESSAGES = 5;
private final AtomicInteger currentNumberOfMessages = new AtomicInteger();
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Tell the Publisher we only want 5 message at a time.
request(NUMBER_OF_MESSAGES);
}
@Override
protected void hookOnNext(ServiceBusReceivedMessage message) {
// Process the ServiceBusReceivedMessage
// If the number of messages we have currently received is a multiple of 5, that means we have reached
// the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
// that the subscriber is ready to get more messages from upstream.
if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
request(NUMBER_OF_MESSAGES);
}
}
});
方法摘要
方法继承自 java.lang.Object
方法详细信息
abandon
public Mono
放弃 ServiceBusReceivedMessage。 这会使消息再次可供处理。 放弃邮件会增加邮件的传递计数。
Parameters:
Returns:
abandon
public Mono
ServiceBusReceivedMessage放弃更新消息的属性。 这会使消息再次可供处理。 放弃邮件会增加邮件的传递计数。
Parameters:
Returns:
close
public void close()
通过关闭指向服务的基础链接释放使用者。
commitTransaction
public Mono
提交事务及其关联的所有操作。
创建和使用事务
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Parameters:
Returns:
complete
public Mono
完成 ServiceBusReceivedMessage。 这会从服务中删除消息。
Parameters:
Returns:
complete
public Mono
ServiceBusReceivedMessage使用给定选项完成 。 这会从服务中删除消息。
Parameters:
Returns:
createTransaction
public Mono
启动新的服务端事务。 ServiceBusTransactionContext应将 传递给需要在此事务中的所有操作。
创建和使用事务
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Returns:
deadLetter
public Mono
将 移动到 ServiceBusReceivedMessage 死信子队列。
Parameters:
Returns:
deadLetter
public Mono
使用给定选项将 移动到 ServiceBusReceivedMessage 死信子队列。
Parameters:
Returns:
defer
public Mono
ServiceBusReceivedMessage延迟 。 这会将消息移动到延迟的子队列中。
Parameters:
Returns:
defer
public Mono
延迟并 ServiceBusReceivedMessage 设置选项。 这会将消息移动到延迟的子队列中。
Parameters:
Returns:
getEntityPath
public String getEntityPath()
获取此客户端与之交互的服务总线资源。
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
获取与连接关联的完全限定的服务总线命名空间。 这可能类似于 {yournamespace}.servicebus.windows.net
。
Returns:
getIdentifier
public String getIdentifier()
获取 实例的 ServiceBusReceiverAsyncClient标识符。
Returns:
getSessionId
public String getSessionId()
如果此接收方是会话接收器,则获取会话的 SessionId。
Returns:
getSessionState
public Mono
如果此接收器是会话接收器,则获取会话的状态。
Returns:
peekMessage
public Mono
在不更改接收方或消息源的状态的情况下读取下一个活动消息。 对 的第一次调用 peek()
提取此接收方的第一条活动消息。 每个后续调用都会提取实体中的后续消息。
Returns:
peekMessage
public Mono
从给定序列号开始,在不更改接收方或消息源的状态的情况下,读取活动消息的下一步。
Parameters:
Returns:
peekMessages
public Flux
在不更改接收方或消息源的状态的情况下读取下一批活动消息。
Parameters:
Returns:
peekMessages
public Flux
从给定序列号开始,在不更改接收方或消息源的状态的情况下读取下一批活动消息。
Parameters:
Returns:
receiveDeferredMessage
public Mono
接收延迟 ServiceBusReceivedMessage的 。 只能使用序列号接收延迟的消息。
Parameters:
Returns:
sequenceNumber
的延迟消息。receiveDeferredMessages
public Flux
接收一批延迟 ServiceBusReceivedMessage的 。 只能使用序列号接收延迟的消息。
Parameters:
Returns:
receiveMessages
public Flux
从服务总线实体接收 的ServiceBusReceivedMessage无限流。 此 Flux 持续接收来自服务总线实体的消息,直到:
- 接收器已关闭。
- 释放了 Flux 的订阅。
- 来自下游订阅服务器的终端信号上游 (即传播。 Flux#take(long) 或 Flux#take(Duration))。
- 导致 AmqpException 接收链接停止的 。
客户端使用下面的 AMQP 链接来接收消息;如果当前链接遇到可重试的错误,客户端将以透明方式转换为新的 AMQP 链接。 当客户端遇到不可重试的错误或重试用完时,订阅服务器的 org.reactivestreams.Subscriber#onError(Throwable) 终端处理程序将收到此错误的通知。 终端事件后不会再将消息传递到 org.reactivestreams.Subscriber#onNext(Object) ;应用程序必须创建一个新客户端才能恢复接收。 重新订阅旧客户端的 Flux 将不起作用。
注意:不可重试错误的一些示例包括 - 应用程序尝试连接到不存在的队列,在接收时删除或禁用队列,用户显式启动异地 DR。 这些是服务总线向客户端传达发生不可重试错误的某些事件。
Returns:
renewMessageLock
public Mono
异步续订消息的锁。 将根据实体上指定的设置续订锁。 当在 PEEK_LOCK 模式下收到消息时,此接收方实例在服务器上锁定消息,在实体创建期间 (LockDuration) 中指定的持续时间。 如果消息的处理时间超过此持续时间,则需要续订锁。 对于每次续订,锁将重置为实体的 LockDuration 值。
Parameters:
Returns:
renewMessageLock
public Mono
启动 的 ServiceBusReceivedMessage自动锁定续订。
Parameters:
Returns:
maxLockRenewalDuration
完成的 Mono。renewSessionLock
public Mono
如果此接收方是会话接收器,则续订会话锁。
Returns:
renewSessionLock
public Mono
为接收方工作的会话启动自动锁定续订。
Parameters:
Returns:
rollbackTransaction
public Mono
回滚给定的事务及其关联的所有操作。
创建和使用事务
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Parameters:
Returns:
setSessionState
public Mono
设置此接收方用于的会话的状态。
Parameters:
Returns: