ServiceBusClientBuilder.ServiceBusProcessorClientBuilder クラス
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusClientBuilder. ServiceBusProcessorClientBuilder
- com.
public final class ServiceBusClientBuilder.ServiceBusProcessorClientBuilder
Service Bus エンティティからのメッセージを ServiceBusProcessorClient 使用するを作成するためのビルダー。 ServiceBusProcessorClient は、メッセージの受信時にメッセージ処理コールバックを通知するプッシュベースのメカニズム、またはエラーが観察されたときにエラー ハンドルを提供します。 したがって、インスタンスを作成するには、 と processError(Consumer<ServiceBusErrorContext> processError) という 2 つのコールバックをprocessMessage(Consumer<ServiceBusReceivedMessageContext> processMessage)構成する必要があります。 既定では、 ServiceBusProcessorClient は自動補完機能と自動ロック更新機能で構成されています。
プロセッサ クライアントをインスタンス化し、PeekLock モードで受信するサンプル コード
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
// Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
// handling message reaches desired state such that it doesn't require Service Bus to redeliver
// the same message, then context.complete() should be called otherwise context.abandon().
final boolean success = Math.random() < 0.5;
if (success) {
try {
context.complete();
} catch (RuntimeException error) {
System.out.printf("Completion of the message %s failed.%n Error: %s%n",
message.getMessageId(), error);
}
} else {
try {
context.abandon();
} catch (RuntimeException error) {
System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
message.getMessageId(), error);
}
}
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
プロセッサ クライアントをインスタンス化し、ReceiveAndDelete モードで受信するサンプル コード
// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
final ServiceBusReceivedMessage message = context.getMessage();
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
message.getSessionId(), message.getSequenceNumber(), message.getBody());
};
// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException exception = (ServiceBusException) errorContext.getException();
System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
exception.getReason());
} else {
System.out.printf("Error occurred: %s%n", errorContext.getException());
}
};
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, tokenCredential)
.processor()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.processMessage(processMessage)
.processError(processError)
.disableAutoComplete()
.buildProcessorClient();
// Starts the processor in the background. Control returns immediately.
processorClient.start();
// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
メソッドの概要
メソッドの継承元: java.lang.Object
メソッドの詳細
buildProcessorClient
public ServiceBusProcessorClient buildProcessorClient()
特定のキューまたはサブスクリプションからの読み取 ServiceBusReceivedMessage りを担当する Service Bus メッセージ プロセッサを作成します。
Returns:
disableAutoComplete
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder disableAutoComplete()
受信したメッセージの自動完了と自動破棄を無効にします。 既定では、正常に処理されたメッセージは です complete()。 メッセージの処理中にエラーが発生した場合は、 です abandon()。
Returns:
maxAutoLockRenewDuration
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)
ロックの自動更新を続行する時間を設定します。 自動更新を設定 Duration#ZERO または null
無効にします。 モードの場合 RECEIVE_AND_DELETE 、自動更新は無効になっています。
Parameters:
null
は、自動更新が無効になっていることを示します。
Returns:
maxConcurrentCalls
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)
このプロセッサで処理する必要がある同時実行メッセージの最大数。 既定値は 1 です。
Parameters:
Returns:
prefetchCount
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder prefetchCount(int prefetchCount)
プロセッサのプリフェッチ数を設定します。 モードと RECEIVE_AND_DELETE モードの両方PEEK_LOCKの場合、既定値は 0 です。 プリフェッチは、アプリケーションがプロセッサを起動する前に、ローカルの取得にメッセージをすぐに使用できるようにすることを目的として、メッセージ フローを高速化します。 0 以外の値を設定すると、その数のメッセージがプリフェッチされます。 値を 0 に設定すると、プリフェッチがオフになります。
Parameters:
Returns:
processError
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processError(Consumer
メッセージの受信中にエラーが発生した場合に呼び出されるプロセッサのエラー ハンドラー。
Parameters:
Returns:
processMessage
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processMessage(Consumer
メッセージの受信時に実行されるプロセッサのメッセージ処理コールバック。
Parameters:
Returns:
queueName
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder queueName(String queueName)
プロセッサを作成するキューの名前を設定します。
Parameters:
Returns:
receiveMode
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)
プロセッサの受信モードを設定します。
Parameters:
Returns:
subQueue
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subQueue(SubQueue subQueue)
接続する の SubQueue 種類を設定します。 Azure Service Bus キューとサブスクリプションは、配信不能キュー (DLQ) と呼ばれるセカンダリ サブキューを提供します。
Parameters:
Returns:
subscriptionName
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName)
リッスンするトピック内のサブスクリプションの名前を設定します。 topicName(String topicName) も設定する必要があります。
Parameters:
Returns:
topicName
public ServiceBusClientBuilder.ServiceBusProcessorClientBuilder topicName(String topicName)
トピックの名前を設定します。 subscriptionName(String subscriptionName) も設定する必要があります。
Parameters:
Returns:
適用対象
Azure SDK for Java