你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

ServiceBusSessionReceiverClient 类

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusSessionReceiverClient

实现

public final class ServiceBusSessionReceiverClient
implements AutoCloseable

同步 会话接收方客户端用于从队列或主题获取会话锁,并创建 ServiceBusReceiverClient 绑定到锁定会话的实例。 会话可用作先入先出 (FIFO) 消息处理。 队列和主题/订阅支持服务总线会话,但是,必须在 创建实体时启用它。

本文档中显示的示例使用名为 DefaultAzureCredential 的凭据对象进行身份验证,该对象适用于大多数方案,包括本地开发和生产环境。 此外,我们建议使用 托管标识 在生产环境中进行身份验证。 可以在 Azure 标识文档中找到有关不同身份验证方式及其相应凭据类型的详细信息。

示例:从特定会话接收消息

如果知道会话 ID,则使用 acceptSession(String sessionId) 获取会话的锁。 PEEK_LOCK强烈建议 用户控制消息解决。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .queueName(sessionEnabledQueueName)
     .disableAutoComplete()
     .buildClient();
 ServiceBusReceiverClient receiver = sessionReceiver.acceptSession("<<my-session-id>>");

 // Keep fetching messages from the session until there are no more messages.
 // The receiveMessage operation returns when either 10 messages have been receiver or, 30 seconds have elapsed.
 boolean hasMoreMessages = true;
 while (hasMoreMessages) {
     IterableStream<ServiceBusReceivedMessage> messages =
         receiver.receiveMessages(10, Duration.ofSeconds(30));
     Iterator<ServiceBusReceivedMessage> iterator = messages.iterator();
     hasMoreMessages = iterator.hasNext();

     while (iterator.hasNext()) {
         ServiceBusReceivedMessage message = iterator.next();
         System.out.printf("Session Id: %s. Contents: %s%n.", message.getSessionId(), message.getBody());

         // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             receiver.complete(message);
         } else {
             receiver.abandon(message);
         }
     }
 }

 // Use the receiver and finally close it along with the sessionReceiver.
 receiver.close();
 sessionReceiver.close();

示例:从第一个可用会话接收消息

使用 acceptNextSession() 获取下一个可用会话的锁,而不指定会话 ID。 PEEK_LOCK强烈建议用户 控制消息解决。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // throws a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 ServiceBusReceiverClient receiver = sessionReceiver.acceptNextSession();

 // Use the receiver and finally close it along with the sessionReceiver.
 try {
     // Keep fetching messages from the session until there are no more messages.
     // The receiveMessage operation returns when either 10 messages have been receiver or, 30 seconds have elapsed.
     boolean hasMoreMessages = true;
     while (hasMoreMessages) {
         IterableStream<ServiceBusReceivedMessage> messages =
             receiver.receiveMessages(10, Duration.ofSeconds(30));
         Iterator<ServiceBusReceivedMessage> iterator = messages.iterator();
         hasMoreMessages = iterator.hasNext();

         while (iterator.hasNext()) {
             ServiceBusReceivedMessage message = iterator.next();
             System.out.printf("Session Id: %s. Message: %s%n.", message.getSessionId(), message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 receiver.complete(message);
             } else {
                 receiver.abandon(message);
             }
         }
     }
 } finally {
     receiver.close();
     sessionReceiver.close();
 }

方法摘要

修饰符和类型 方法和描述
ServiceBusReceiverClient acceptNextSession()

获取下一个可用会话的会话锁,并创建 ServiceBusReceiverClient 用于从会话接收消息的 。

ServiceBusReceiverClient acceptSession(String sessionId)

获取 的 sessionId 会话锁,并创建 以 ServiceBusReceiverClient 从会话接收消息。

void close()

方法继承自 java.lang.Object

方法详细信息

acceptNextSession

public ServiceBusReceiverClient acceptNextSession()

获取下一个可用会话的会话锁,并创建 ServiceBusReceiverClient 用于从会话接收消息的 。 如果没有人立即可用,它将等待会话可用。

Returns:

绑定到 ServiceBusReceiverClient 可用会话的 。

acceptSession

public ServiceBusReceiverClient acceptSession(String sessionId)

获取 的 sessionId 会话锁,并创建 以 ServiceBusReceiverClient 从会话接收消息。 如果会话已被其他客户端锁定, AmqpException 则会立即引发 。

Parameters:

sessionId - 会话 ID。

Returns:

绑定到 ServiceBusReceiverClient 指定会话的 。

close

public void close()

适用于