次の方法で共有


ServiceBusProcessorClient クラス

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusProcessorClient

実装

public final class ServiceBusProcessorClient
implements AutoCloseable

Service Bus メッセージを処理するためのプロセッサ クライアント。 ServiceBusProcessorClient は、メッセージの受信時にメッセージ処理コールバックを呼び出すプッシュベースのメカニズム、またはメッセージの受信時にエラーが発生したときにエラー ハンドラーを提供します。 を ServiceBusProcessorClient 作成して、セッションが有効な Service Bus エンティティまたはセッション対応でない Service Bus エンティティのメッセージを処理できます。 既定では、メッセージの自動決済がサポートされています。

プロセッサ クライアントをインスタンス化し、PeekLock モードで受信するサンプル コード

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

プロセッサ クライアントをインスタンス化し、ReceiveAndDelete モードで受信するサンプル コード

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();

セッション対応プロセッサを作成して実行する

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
     ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 Consumer<ServiceBusErrorContext> onError = context -> {
     System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
         context.getFullyQualifiedNamespace(), context.getEntityPath());

     if (context.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) context.getException();

         System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", context.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .sessionProcessor()
     .queueName(sessionEnabledQueueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()
     .maxConcurrentSessions(2)
     .processMessage(onMessage)
     .processError(onError)
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 sessionProcessor.start();

 // Stop processor and dispose when done processing messages.
 sessionProcessor.stop();
 sessionProcessor.close();

メソッドの概要

修飾子と型 メソッドと説明
synchronized void close()

メッセージ処理を停止し、プロセッサを閉じます。

synchronized String getIdentifier()

のインスタンス ServiceBusProcessorClientの識別子を取得します。

String getQueueName()

のこのインスタンスに関連付けられているキュー名を ServiceBusProcessorClient返します。

String getSubscriptionName()

のこのインスタンスに関連付けられているサブスクリプション名を ServiceBusProcessorClient返します。

String getTopicName()

のこのインスタンスに関連付けられているトピック名を ServiceBusProcessorClient返します。

synchronized boolean isRunning()

プロセッサが true 実行されている場合は を返します。

synchronized void start()

バックグラウンドでプロセッサを起動します。

synchronized void stop()

このプロセッサのメッセージ処理を停止します。

メソッドの継承元: java.lang.Object

メソッドの詳細

close

public synchronized void close()

メッセージ処理を停止し、プロセッサを閉じます。 受信リンクとセッションは閉じられ、 を呼び出すと start() 、新しいリンクと新しいセッションを含む新しい処理サイクルが作成されます。

getIdentifier

public synchronized String getIdentifier()

のインスタンス ServiceBusProcessorClientの識別子を取得します。

Returns:

のインスタンス ServiceBusProcessorClientを識別できる識別子。

getQueueName

public String getQueueName()

のこのインスタンスに関連付けられているキュー名を ServiceBusProcessorClient返します。

Returns:

のこのインスタンス ServiceBusProcessorClient に関連付けられているキュー名。プロセッサ null インスタンスがトピックとサブスクリプション用の場合は 。

getSubscriptionName

public String getSubscriptionName()

のこのインスタンスに関連付けられているサブスクリプション名を ServiceBusProcessorClient返します。

Returns:

のこのインスタンス ServiceBusProcessorClient に関連付けられているサブスクリプション名。プロセッサ null インスタンスがキュー用の場合は 。

getTopicName

public String getTopicName()

のこのインスタンスに関連付けられているトピック名を ServiceBusProcessorClient返します。

Returns:

のこのインスタンス ServiceBusProcessorClient に関連付けられているトピック名。プロセッサ null インスタンスがキュー用の場合は 。

isRunning

public synchronized boolean isRunning()

プロセッサが true 実行されている場合は を返します。 プロセッサが停止または閉じている場合、このメソッドは を返します false

Returns:

true プロセッサが実行されている場合は 。 false それ以外の場合は 。

start

public synchronized void start()

バックグラウンドでプロセッサを起動します。 このメソッドが呼び出されると、プロセッサは、新しいメッセージが使用可能になったときにメッセージ ハンドラーを呼び出すメッセージ レシーバーを開始します。 このメソッドはべき等です (つまり、プロセッサが既に実行されている後にもう一度呼び出すことは start() 操作なしです)。

呼び出し後に を呼び出start()stop()すと、基になる同じ接続を使用してメッセージの処理が再開されます。

を呼び出した後に を呼び start()close() すと、新しい接続でプロセッサが起動します。

stop

public synchronized void stop()

このプロセッサのメッセージ処理を停止します。 受信リンクとセッションはアクティブな状態に保たれ、このプロセッサはもう一度 を呼び出 start() すことによってメッセージの処理を再開できます。

適用対象