ServiceBusProcessorClient クラス
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusProcessorClient
- com.
実装
public final class ServiceBusProcessorClient
implements AutoCloseable
Service Bus メッセージを処理するためのプロセッサ クライアント。 ServiceBusProcessorClient は、メッセージの受信時にメッセージ処理コールバックを呼び出すプッシュベースのメカニズム、またはメッセージの受信時にエラーが発生したときにエラー ハンドラーを提供します。 を ServiceBusProcessorClient 作成して、セッションが有効な Service Bus エンティティまたはセッション対応でない Service Bus エンティティのメッセージを処理できます。 既定では、メッセージの自動決済がサポートされています。
プロセッサ クライアントをインスタンス化し、PeekLock モードで受信するサンプル コード
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
プロセッサ クライアントをインスタンス化し、ReceiveAndDelete モードで受信するサンプル コード
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
セッション対応プロセッサを作成して実行する
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
Consumer<ServiceBusErrorContext> onError = context -> {
System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
context.getFullyQualifiedNamespace(), context.getEntityPath());
if (context.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) context.getException();
System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", context.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.sessionProcessor()
.queueName(sessionEnabledQueueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.maxConcurrentSessions(2)
.processMessage(onMessage)
.processError(onError)
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();
// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
メソッドの概要
修飾子と型 | メソッドと説明 |
---|---|
synchronized void |
close()
メッセージ処理を停止し、プロセッサを閉じます。 |
synchronized String |
getIdentifier()
のインスタンス ServiceBusProcessorClientの識別子を取得します。 |
String |
getQueueName()
のこのインスタンスに関連付けられているキュー名を ServiceBusProcessorClient返します。 |
String |
getSubscriptionName()
のこのインスタンスに関連付けられているサブスクリプション名を ServiceBusProcessorClient返します。 |
String |
getTopicName()
のこのインスタンスに関連付けられているトピック名を ServiceBusProcessorClient返します。 |
synchronized boolean |
isRunning()
プロセッサが |
synchronized void |
start()
バックグラウンドでプロセッサを起動します。 |
synchronized void |
stop()
このプロセッサのメッセージ処理を停止します。 |
メソッドの継承元: java.lang.Object
メソッドの詳細
close
public synchronized void close()
メッセージ処理を停止し、プロセッサを閉じます。 受信リンクとセッションは閉じられ、 を呼び出すと start() 、新しいリンクと新しいセッションを含む新しい処理サイクルが作成されます。
getIdentifier
public synchronized String getIdentifier()
のインスタンス ServiceBusProcessorClientの識別子を取得します。
Returns:
getQueueName
public String getQueueName()
のこのインスタンスに関連付けられているキュー名を ServiceBusProcessorClient返します。
Returns:
null
インスタンスがトピックとサブスクリプション用の場合は 。getSubscriptionName
public String getSubscriptionName()
のこのインスタンスに関連付けられているサブスクリプション名を ServiceBusProcessorClient返します。
Returns:
null
インスタンスがキュー用の場合は 。getTopicName
public String getTopicName()
のこのインスタンスに関連付けられているトピック名を ServiceBusProcessorClient返します。
Returns:
null
インスタンスがキュー用の場合は 。isRunning
public synchronized boolean isRunning()
プロセッサが true
実行されている場合は を返します。 プロセッサが停止または閉じている場合、このメソッドは を返します false
。
Returns:
true
プロセッサが実行されている場合は 。 false
それ以外の場合は 。start
public synchronized void start()
バックグラウンドでプロセッサを起動します。 このメソッドが呼び出されると、プロセッサは、新しいメッセージが使用可能になったときにメッセージ ハンドラーを呼び出すメッセージ レシーバーを開始します。 このメソッドはべき等です (つまり、プロセッサが既に実行されている後にもう一度呼び出すことは start()
操作なしです)。
呼び出し後に を呼び出start()
stop()すと、基になる同じ接続を使用してメッセージの処理が再開されます。
を呼び出した後に を呼び start()
出 close() すと、新しい接続でプロセッサが起動します。
stop
public synchronized void stop()
このプロセッサのメッセージ処理を停止します。 受信リンクとセッションはアクティブな状態に保たれ、このプロセッサはもう一度 を呼び出 start() すことによってメッセージの処理を再開できます。
適用対象
Azure SDK for Java