次の方法で共有


ServiceBusReceiverClient クラス

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusReceiverClient

実装

public final class ServiceBusReceiverClient
implements AutoCloseable

Azure Service Busのキューまたはトピック/サブスクリプションからの受信をServiceBusReceivedMessage担当する同期レシーバー。

このドキュメントに示す例では、認証に DefaultAzureCredential という名前の資格情報オブジェクトを使用します。これは、ローカルの開発環境や運用環境を含むほとんどのシナリオに適しています。 さらに、運用環境での認証に マネージド ID を 使用することをお勧めします。 認証のさまざまな方法とそれに対応する資格情報の種類の詳細については、 Azure ID のドキュメントを参照してください

サンプル: 受信者を作成してメッセージを受信する

次のコード サンプルは、Service Bus サブスクリプションからメッセージを受信するための同期クライアント ServiceBusReceiverClient の作成と使用を示しています。 受信操作では、10 個のメッセージが受信されたか、30 秒が経過したときにが返されます。 既定では、 を使用して PEEK_LOCK メッセージを受信し、顧客は受信側クライアントの決済方法のいずれかを使用してメッセージを決済する必要があります。 "受信操作の決済" は、メッセージの受け取りに関する追加情報を提供します。

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .receiver()
     .topicName(topicName)
     .subscriptionName(subscriptionName)
     .buildClient();

 // Receives a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever
 // happens first.
 IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
 messages.forEach(message -> {
     System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), message.getBody());

     // If able to process message, complete it. Otherwise, abandon it and allow it to be
     // redelivered.
     if (isMessageProcessed) {
         receiver.complete(message);
     } else {
         receiver.abandon(message);
     }
 });

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 receiver.close();

メソッドの概要

修飾子と型 メソッドと説明
void abandon(ServiceBusReceivedMessage message)

を破棄します ServiceBusReceivedMessage

void abandon(ServiceBusReceivedMessage message, AbandonOptions options)

ServiceBusReceivedMessage 破棄し、メッセージのプロパティを更新します。

void close()

サービスへの基になるリンクを閉じて、コンシューマーを破棄します。

void commitTransaction(ServiceBusTransactionContext transactionContext)

トランザクションとそのトランザクションに関連付けられているすべての操作をコミットします。

void complete(ServiceBusReceivedMessage message)

を完了します ServiceBusReceivedMessage

void complete(ServiceBusReceivedMessage message, CompleteOptions options)

を完了します ServiceBusReceivedMessage

ServiceBusTransactionContext createTransaction()

Service Bus で新しいトランザクションを開始します。

void deadLetter(ServiceBusReceivedMessage message)

ServiceBusReceivedMessageを配信不能サブキューに移動します。

void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

ServiceBusReceivedMessage配信不能の理由、エラーの説明、または変更されたプロパティを使用して、 を配信不能サブキューに移動します。

void defer(ServiceBusReceivedMessage message)

を延期します ServiceBusReceivedMessage

void defer(ServiceBusReceivedMessage message, DeferOptions options)

変更されたメッセージ プロパティで ServiceBusReceivedMessage ロック トークンを使用して を遅延します。

String getEntityPath()

このクライアントが操作する Service Bus リソースを取得します。

String getFullyQualifiedNamespace()

接続が関連付けられている完全修飾 Service Bus 名前空間を取得します。

String getIdentifier()

のインスタンス ServiceBusReceiverClientの識別子を取得します。

String getSessionId()

このレシーバーがセッションレシーバーの場合は、セッションのセッション ID を取得します。

byte[] getSessionState()

このレシーバーがセッション レシーバーの場合は、セッションの状態を取得します。

ServiceBusReceivedMessage peekMessage()

受信側またはメッセージ ソースの状態を変更せずに、次のアクティブなメッセージを読み取ります。

ServiceBusReceivedMessage peekMessage(long sequenceNumber)

指定したシーケンス番号から、受信側またはメッセージ ソースの状態を変更せずに、アクティブなメッセージの次を読み取ります。

IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages)

受信側またはメッセージ ソースの状態を変更せずに、アクティブなメッセージの次のバッチを読み取ります。

IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)

指定したシーケンス番号から、受信側またはメッセージ・ソースの状態を変更せずに、アクティブ・メッセージの次のバッチを読み取ります。

ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber)

遅延 ServiceBusReceivedMessageを受け取ります。

IterableStream<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers)

遅延 ServiceBusReceivedMessageのバッチを受け取ります。

IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages)

Service Bus エンティティから の許容ストリーム ServiceBusReceivedMessage を受信します。

IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages, Duration maxWaitTime)

Service Bus エンティティから の許容ストリーム ServiceBusReceivedMessage を受信します。

OffsetDateTime renewMessageLock(ServiceBusReceivedMessage message)

指定したメッセージのロックを更新します。

void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer<Throwable> onError)

指定されたロックを使用してメッセージの自動ロック更新を開始します。

OffsetDateTime renewSessionLock()

このレシーバーがセッション レシーバーの場合は、セッションの状態を設定します。

void renewSessionLock(Duration maxLockRenewalDuration, Consumer<Throwable> onError)

このレシーバーが動作するセッションの自動ロック更新を開始します。

void rollbackTransaction(ServiceBusTransactionContext transactionContext)

指定されたトランザクションと、それに関連付けられているすべての操作をロールバックします。

void setSessionState(byte[] sessionState)

このレシーバーがセッション レシーバーの場合は、セッションの状態を設定します。

メソッドの継承元: java.lang.Object

メソッドの詳細

abandon

public void abandon(ServiceBusReceivedMessage message)

を破棄します ServiceBusReceivedMessage。 これにより、メッセージを再び処理できるようになります。 メッセージを破棄すると、メッセージの配信数が増えます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。

abandon

public void abandon(ServiceBusReceivedMessage message, AbandonOptions options)

ServiceBusReceivedMessage 破棄し、メッセージのプロパティを更新します。 これにより、メッセージを再び処理できるようになります。 メッセージを破棄すると、メッセージの配信数が増えます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。
options - メッセージを破棄するときに設定するオプション。

close

public void close()

サービスへの基になるリンクを閉じて、コンシューマーを破棄します。

commitTransaction

public void commitTransaction(ServiceBusTransactionContext transactionContext)

トランザクションとそのトランザクションに関連付けられているすべての操作をコミットします。

トランザクションの作成と使用

ServiceBusTransactionContext transaction = receiver.createTransaction();

 // Process messages and associate operations with the transaction.
 ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
 receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
 receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
 receiver.commitTransaction(transaction);

Parameters:

transactionContext - コミットするトランザクション。

complete

public void complete(ServiceBusReceivedMessage message)

を完了します ServiceBusReceivedMessage。 これにより、サービスからメッセージが削除されます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。

complete

public void complete(ServiceBusReceivedMessage message, CompleteOptions options)

を完了します ServiceBusReceivedMessage。 これにより、サービスからメッセージが削除されます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。
options - メッセージを完了するために使用されるオプション。

createTransaction

public ServiceBusTransactionContext createTransaction()

Service Bus で新しいトランザクションを開始します。 は ServiceBusTransactionContext 、このトランザクションに含める必要があるすべての操作に渡す必要があります。

サンプル: トランザクションの作成と使用

ServiceBusTransactionContext transaction = receiver.createTransaction();

 // Process messages and associate operations with the transaction.
 ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
 receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
 receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
 receiver.commitTransaction(transaction);

Returns:

deadLetter

public void deadLetter(ServiceBusReceivedMessage message)

ServiceBusReceivedMessageを配信不能サブキューに移動します。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。

deadLetter

public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

ServiceBusReceivedMessage配信不能の理由、エラーの説明、または変更されたプロパティを使用して、 を配信不能サブキューに移動します。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。
options - メッセージの配信不能に使用されるオプション。

defer

public void defer(ServiceBusReceivedMessage message)

を延期します ServiceBusReceivedMessage。 これにより、メッセージが遅延サブキューに移動されます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。

defer

public void defer(ServiceBusReceivedMessage message, DeferOptions options)

変更されたメッセージ プロパティで ServiceBusReceivedMessage ロック トークンを使用して を遅延します。 これにより、メッセージが遅延サブキューに移動されます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。
options - メッセージを延期するために使用されるオプション。

getEntityPath

public String getEntityPath()

このクライアントが操作する Service Bus リソースを取得します。

Returns:

このクライアントが操作する Service Bus リソース。

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

接続が関連付けられている完全修飾 Service Bus 名前空間を取得します。 これは、 と似ている可能性があります {yournamespace}.servicebus.windows.net

Returns:

接続が関連付けられている完全修飾 Service Bus 名前空間。

getIdentifier

public String getIdentifier()

のインスタンス ServiceBusReceiverClientの識別子を取得します。

Returns:

のインスタンス ServiceBusReceiverClientを識別できる識別子。

getSessionId

public String getSessionId()

このレシーバーがセッション レシーバーの場合は、セッションの SessionId を取得します。

Returns:

セッション レシーバーでない場合は、SessionId または null。

getSessionState

public byte[] getSessionState()

このレシーバーがセッション レシーバーの場合は、セッションの状態を取得します。

Returns:

セッションの状態、またはセッションの状態が設定されていない場合は null。

peekMessage

public ServiceBusReceivedMessage peekMessage()

受信側またはメッセージ ソースの状態を変更せずに、次のアクティブなメッセージを読み取ります。 を最初に peekMessage() 呼び出すと、この受信側の最初のアクティブ メッセージがフェッチされます。 後続の呼び出しのたびに、エンティティ内の後続のメッセージがフェッチされます。

Returns:

ピークされた ServiceBusReceivedMessage

peekMessage

public ServiceBusReceivedMessage peekMessage(long sequenceNumber)

指定したシーケンス番号から、受信側またはメッセージ ソースの状態を変更せずに、アクティブなメッセージの次を読み取ります。

Parameters:

sequenceNumber - メッセージの読み取り元のシーケンス番号。

Returns:

ピークされた ServiceBusReceivedMessage

peekMessages

public IterableStream peekMessages(int maxMessages)

受信側またはメッセージ ソースの状態を変更せずに、アクティブなメッセージの次のバッチを読み取ります。

Parameters:

maxMessages - ピークするメッセージの最大数。

Returns:

IterableStream<T>ピークされている の ServiceBusReceivedMessage

peekMessages

public IterableStream peekMessages(int maxMessages, long sequenceNumber)

指定したシーケンス番号から、受信側またはメッセージ・ソースの状態を変更せずに、アクティブ・メッセージの次のバッチを読み取ります。

Parameters:

maxMessages - メッセージ数。
sequenceNumber - メッセージの読み取りを開始する場所のシーケンス番号。

Returns:

receiveDeferredMessage

public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber)

遅延 ServiceBusReceivedMessageを受け取ります。 遅延メッセージは、シーケンス番号を使用してのみ受信できます。

Parameters:

sequenceNumber - メッセージの getSequenceNumber()

Returns:

一致する sequenceNumberを含む遅延メッセージ。

receiveDeferredMessageBatch

public IterableStream receiveDeferredMessageBatch(Iterable sequenceNumbers)

遅延 ServiceBusReceivedMessageのバッチを受け取ります。 遅延メッセージは、シーケンス番号を使用してのみ受信できます。

Parameters:

sequenceNumbers - 遅延メッセージのシーケンス番号。

Returns:

receiveMessages

public IterableStream receiveMessages(int maxMessages)

Service Bus エンティティから の許容ストリーム ServiceBusReceivedMessage を受信します。 受信操作は、メッセージを受信するまで既定の 1 分間待機してからタイムアウトします。を使用 receiveMessages(int maxMessages, Duration maxWaitTime)してオーバーライドできます。

クライアントは、その下にある AMQP リンクを使用してメッセージを受信します。現在の AMQP リンクで再トリable エラーが発生した場合、クライアントは透過的に新しい AMQP リンクに移行します。 クライアントで再試行不可能なエラーが発生した場合、または再試行が使い果たされると、receiveMessages API のそれ以上の呼び出しによって返された の繰り返し (forEach など) IterableStream<T> によって、エラーがアプリケーションにスローされます。 アプリケーションがこのエラーを受け取ったら、アプリケーションはクライアントをリセットする必要があります。つまり、現在 ServiceBusReceiverClient の クライアントを閉じて、メッセージの受信を続けるために新しいクライアントを作成します。

注: 再取得できないエラーのいくつかの例は、アプリケーションが存在しないキューへの接続を試みている、受信中にキューを削除または無効にする、ユーザーが Geo-DR を明示的に開始するなどです。 これらは、Service Bus がクライアントと通信して、再取得不可能なエラーが発生した特定のイベントです。

Parameters:

maxMessages - 受信するメッセージの最大数。

Returns:

IterableStream<T> Service Bus エンティティからの最大maxMessagesのメッセージの 。

receiveMessages

public IterableStream receiveMessages(int maxMessages, Duration maxWaitTime)

Service Bus エンティティから の許容ストリーム ServiceBusReceivedMessage を受信します。 の作成時に変更されない限り、既定のServiceBusReceiverClientServiceBusReceiverClientBuilder#receiveMode(ServiceBusReceiveMode)受信モードは PEEK_LOCK です。

クライアントは、その下にある AMQP リンクを使用してメッセージを受信します。現在の AMQP リンクで再トリable エラーが発生した場合、クライアントは透過的に新しい AMQP リンクに移行します。 クライアントで再試行不可能なエラーが発生した場合、または再試行が使い果たされると、receiveMessages API のそれ以上の呼び出しによって返された の繰り返し (forEach など) IterableStream<T> によって、エラーがアプリケーションにスローされます。 アプリケーションがこのエラーを受け取ったら、アプリケーションはクライアントをリセットする必要があります。つまり、現在 ServiceBusReceiverClient の クライアントを閉じて、メッセージの受信を続けるために新しいクライアントを作成します。

注: 再取得できないエラーのいくつかの例は、アプリケーションが存在しないキューへの接続を試みている、受信中にキューを削除または無効にする、ユーザーが Geo-DR を明示的に開始するなどです。 これらは、Service Bus がクライアントと通信して、再取得不可能なエラーが発生した特定のイベントです。

Parameters:

maxMessages - 受信するメッセージの最大数。
maxWaitTime - クライアントがメッセージの受信を待機してからタイムアウトするまでの時間。

Returns:

IterableStream<T> Service Bus エンティティからの最大maxMessagesのメッセージの 。

renewMessageLock

public OffsetDateTime renewMessageLock(ServiceBusReceivedMessage message)

指定したメッセージのロックを更新します。 ロックは、エンティティで指定された設定に基づいて更新されます。 メッセージがモードで PEEK_LOCK 受信されると、キューの作成 (LockDuration) 中に指定された期間、この受信側インスタンスのサーバーでメッセージがロックされます。 メッセージの処理にこの期間より長い時間が必要な場合は、ロックを更新する必要があります。 更新ごとに、ロックはエンティティの LockDuration 値にリセットされます。

Parameters:

message - ロックの ServiceBusReceivedMessage 更新を実行する 。

Returns:

メッセージの新しい有効期限。

renewMessageLock

public void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer onError)

指定されたロックを使用してメッセージの自動ロック更新を開始します。

Parameters:

message - ServiceBusReceivedMessage自動ロックの更新を実行する 。
maxLockRenewalDuration - ロック トークンの更新を続ける最大期間。
onError - ロックの更新中にエラーが発生したときにを呼び出す関数。

renewSessionLock

public OffsetDateTime renewSessionLock()

このレシーバーがセッション レシーバーの場合は、セッションの状態を設定します。

Returns:

セッション ロックの次の有効期限。

renewSessionLock

public void renewSessionLock(Duration maxLockRenewalDuration, Consumer onError)

このレシーバーが動作するセッションの自動ロック更新を開始します。

Parameters:

maxLockRenewalDuration - セッションの更新を続ける最大期間。
onError - ロックの更新中にエラーが発生したときにを呼び出す関数。

rollbackTransaction

public void rollbackTransaction(ServiceBusTransactionContext transactionContext)

指定されたトランザクションと、それに関連付けられているすべての操作をロールバックします。

トランザクションの作成と使用

ServiceBusTransactionContext transaction = receiver.createTransaction();

 // Process messages and associate operations with the transaction.
 ServiceBusReceivedMessage deferredMessage = receiver.receiveDeferredMessage(sequenceNumber);
 receiver.complete(deferredMessage, new CompleteOptions().setTransactionContext(transaction));
 receiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction));
 receiver.commitTransaction(transaction);

Parameters:

transactionContext - ロールバックするトランザクション。

setSessionState

public void setSessionState(byte[] sessionState)

このレシーバーがセッション レシーバーの場合は、セッションの状態を設定します。

Parameters:

sessionState - セッションで設定する状態。

適用対象