次の方法で共有


ServiceBusReceiverAsyncClient クラス

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient

実装

public final class ServiceBusReceiverAsyncClient
implements AutoCloseable

Azure Service Bus キューまたはトピック/サブスクリプションからの受信を担当するServiceBusReceivedMessage非同期レシーバー。

このドキュメントに示す例では、認証に DefaultAzureCredential という名前の資格情報オブジェクトを使用します。これは、ローカルの開発環境や運用環境など、ほとんどのシナリオに適しています。 さらに、運用環境での認証には マネージド ID を 使用することをお勧めします。 さまざまな認証方法と、それに対応する資格情報の種類の詳細については、 Azure Identity のドキュメントを参照してください

サンプル: を作成する ServiceBusReceiverAsyncClient

次のコード サンプルは、非同期クライアント ServiceBusReceiverAsyncClientの作成を示しています。 は fullyQualifiedNamespace Service Bus 名前空間のホスト名です。 Azure Portal を介して Event Hubs 名前空間に移動した後、"Essentials" パネルの下に一覧表示されます。 使用される資格情報は、 DefaultAzureCredential デプロイと開発でよく使用される資格情報を組み合わせ、実行環境に基づいて使用する資格情報を選択するためです。 PEEK_LOCK (既定の受信モード) と disableAutoComplete() は、ユーザーがメッセージの受け取りを制御できるように 強くお 勧めします。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .receiver()
     .disableAutoComplete()
     .queueName(queueName)
     .buildAsyncClient();

 // When users are done with the receiver, dispose of the receiver.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 asyncReceiver.close();

サンプル: Service Bus リソースからすべてのメッセージを受信する

これにより、Service Bus からメッセージの無限ストリームが返されます。 ストリームは、サブスクリプションが破棄されるか、またはその他のターミナル シナリオで終了します。 詳細については、「 receiveMessages() 」を参照してください。

// Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 // Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
 Disposable subscription = asyncReceiver.receiveMessages()
     .flatMap(message -> {
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());

         // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return asyncReceiver.complete(message);
         } else {
             return asyncReceiver.abandon(message);
         }
     })
     .subscribe(unused -> {
     }, error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 asyncReceiver.close();

サンプル: Service Bus エンティティからモードで RECEIVE_AND_DELETE メッセージを受信する

次のコード サンプルは、 を使用した非同期クライアント ServiceBusReceiverAsyncClient の作成を RECEIVE_AND_DELETE示しています。 は fullyQualifiedNamespace Service Bus 名前空間のホスト名です。 Azure Portal を介して Event Hubs 名前空間に移動した後、"Essentials" パネルの下に一覧表示されます。 使用される資格情報は、 DefaultAzureCredential デプロイと開発でよく使用される資格情報を組み合わせ、実行環境に基づいて使用する資格情報を選択するためです。 RECEIVE_AND_DELETEこのモードを使用したメッセージの受信の詳細については、ドキュメントを参照してください。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 Disposable subscription = Flux.usingWhen(
         Mono.fromCallable(() -> {
             // Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
             // peek lock mode is used. In peek lock mode, users are responsible for settling messages.
             return new ServiceBusClientBuilder()
                 .credential(fullyQualifiedNamespace, credential)
                 .receiver()
                 .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                 .queueName(queueName)
                 .buildAsyncClient();
         }), receiver -> {
             return receiver.receiveMessages();
         }, receiver -> {
             return Mono.fromRunnable(() -> receiver.close());
         })
     .subscribe(message -> {
             // Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
             // removed from the queue.
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());
     },
         error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

サンプル: 特定のセッションからメッセージを受信する

特定のセッションからメッセージをフェッチするには、 に ServiceBusSessionReceiverClientBuilder 切り替えて、セッション レシーバー クライアントをビルドします。 を使用して acceptSession(String sessionId) 、セッションバインド ServiceBusReceiverAsyncClientを作成します。 このサンプルでは、 キューの作成時に Service Bus セッションが有効になっていることを前提としています。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

サンプル: 使用可能な最初のセッションからメッセージを受信する

最初に使用可能なセッションからのメッセージを処理するには、 に ServiceBusSessionReceiverClientBuilder 切り替えて、セッション レシーバー クライアントをビルドします。 を使用して acceptNextSession() 、メッセージを処理する最初の使用可能なセッションを見つけます。

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

サンプル: Service Bus エンティティからのメッセージのレート制限の使用

特定の時刻に受信するメッセージの数を制限する必要があるメッセージ レシーバーの場合は、 を使用 BaseSubscriber#request(long)できます。

// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
 asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
     private static final int NUMBER_OF_MESSAGES = 5;
     private final AtomicInteger currentNumberOfMessages = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 message at a time.
         request(NUMBER_OF_MESSAGES);
     }

     @Override
     protected void hookOnNext(ServiceBusReceivedMessage message) {
         // Process the ServiceBusReceivedMessage
         // If the number of messages we have currently received is a multiple of 5, that means we have reached
         // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
         // that the subscriber is ready to get more messages from upstream.
         if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_MESSAGES);
         }
     }
 });

メソッドの概要

修飾子と型 メソッドと説明
Mono<Void> abandon(ServiceBusReceivedMessage message)

ServiceBusReceivedMessage破棄します。

Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options)

を破棄すると、 ServiceBusReceivedMessage メッセージのプロパティが更新されます。

void close()

サービスへの基になるリンクを閉じて、コンシューマーを破棄します。

Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)

トランザクションとそのトランザクションに関連付けられているすべての操作をコミットします。

Mono<Void> complete(ServiceBusReceivedMessage message)

を完了します ServiceBusReceivedMessage

Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options)

指定したオプションを ServiceBusReceivedMessage 使用して を完了します。

Mono<ServiceBusTransactionContext> createTransaction()

新しいサービス側トランザクションを開始します。

Mono<Void> deadLetter(ServiceBusReceivedMessage message)

ServiceBusReceivedMessageを配信不能サブキューに移動します。

Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

ServiceBusReceivedMessage指定したオプションを使用して、 を配信不能サブキューに移動します。

Mono<Void> defer(ServiceBusReceivedMessage message)

を延期します ServiceBusReceivedMessage

Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)

オプションを ServiceBusReceivedMessage 設定して を延期します。

String getEntityPath()

このクライアントが操作する Service Bus リソースを取得します。

String getFullyQualifiedNamespace()

接続が関連付けられている完全修飾 Service Bus 名前空間を取得します。

String getIdentifier()

のインスタンス ServiceBusReceiverAsyncClientの識別子を取得します。

String getSessionId()

このレシーバーがセッションレシーバーの場合は、セッションのセッション ID を取得します。

Mono<byte[]> getSessionState()

このレシーバーがセッション レシーバーの場合は、セッションの状態を取得します。

Mono<ServiceBusReceivedMessage> peekMessage()

受信側またはメッセージ ソースの状態を変更せずに、次のアクティブなメッセージを読み取ります。

Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber)

指定したシーケンス番号から、受信側またはメッセージ ソースの状態を変更せずに、アクティブなメッセージの次を読み取ります。

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages)

受信側またはメッセージ ソースの状態を変更せずに、アクティブなメッセージの次のバッチを読み取ります。

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)

指定したシーケンス番号から、受信側またはメッセージ・ソースの状態を変更せずに、アクティブ・メッセージの次のバッチを読み取ります。

Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber)

遅延 ServiceBusReceivedMessageを受け取ります。

Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers)

遅延 ServiceBusReceivedMessageのバッチを受け取ります。

Flux<ServiceBusReceivedMessage> receiveMessages()

Service Bus エンティティから のServiceBusReceivedMessage無限ストリームを受信します。

Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)

メッセージのロックを非同期的に更新します。

Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

の自動ロック更新を ServiceBusReceivedMessage開始します。

Mono<OffsetDateTime> renewSessionLock()

このレシーバーがセッション レシーバーの場合は、セッション ロックを更新します。

Mono<Void> renewSessionLock(Duration maxLockRenewalDuration)

このレシーバーが動作するセッションの自動ロック更新を開始します。

Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)

指定されたトランザクションと、それに関連付けられているすべての操作をロールバックします。

Mono<Void> setSessionState(byte[] sessionState)

受信側が動作するセッションの状態を設定します。

メソッドの継承元: java.lang.Object

メソッドの詳細

abandon

public Mono abandon(ServiceBusReceivedMessage message)

ServiceBusReceivedMessage破棄します。 これにより、メッセージを再び処理できるようになります。 メッセージを破棄すると、メッセージの配信数が増えます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。

Returns:

Mono Service Bus の破棄操作が完了したときに完了する 。

abandon

public Mono abandon(ServiceBusReceivedMessage message, AbandonOptions options)

を破棄すると、 ServiceBusReceivedMessage メッセージのプロパティが更新されます。 これにより、メッセージを再び処理できるようになります。 メッセージを破棄すると、メッセージの配信数が増えます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。
options - メッセージを破棄するときに設定するオプション。

Returns:

Mono Service Bus 操作が完了したときに完了する 。

close

public void close()

サービスへの基になるリンクを閉じて、コンシューマーを破棄します。

commitTransaction

public Mono commitTransaction(ServiceBusTransactionContext transactionContext)

トランザクションとそのトランザクションに関連付けられているすべての操作をコミットします。

トランザクションの作成と使用

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - コミットするトランザクション。

Returns:

Mono Service Bus リソースでこの操作を完了する 。

complete

public Mono complete(ServiceBusReceivedMessage message)

を完了します ServiceBusReceivedMessage。 これにより、サービスからメッセージが削除されます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。

Returns:

Mono Service Bus でメッセージが完了したときに終了する 。

complete

public Mono complete(ServiceBusReceivedMessage message, CompleteOptions options)

指定したオプションを ServiceBusReceivedMessage 使用して を完了します。 これにより、サービスからメッセージが削除されます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。
options - メッセージを完了するために使用されるオプション。

Returns:

Mono Service Bus でメッセージが完了したときに終了する 。

createTransaction

public Mono createTransaction()

新しいサービス側トランザクションを開始します。 は ServiceBusTransactionContext 、このトランザクションに含める必要があるすべての操作に渡す必要があります。

トランザクションの作成と使用

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Returns:

Mono Service Bus リソースでこの操作を完了する 。

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message)

ServiceBusReceivedMessageを配信不能サブキューに移動します。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。

Returns:

Mono配信不能操作が完了したときに完了する 。

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

ServiceBusReceivedMessage指定したオプションを使用して、 を配信不能サブキューに移動します。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。
options - メッセージの配信不能に使用されるオプション。

Returns:

Mono配信不能操作が完了したときに完了する 。

defer

public Mono defer(ServiceBusReceivedMessage message)

を延期します ServiceBusReceivedMessage。 これにより、メッセージが遅延サブキューに移動されます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。

Returns:

Mono Service Bus の遅延操作が完了したときに完了する 。

defer

public Mono defer(ServiceBusReceivedMessage message, DeferOptions options)

オプションを ServiceBusReceivedMessage 設定して を延期します。 これにより、メッセージが遅延サブキューに移動されます。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。
options - メッセージを延期するために使用されるオプション。

Returns:

Mono遅延操作が完了したときに完了する 。

getEntityPath

public String getEntityPath()

このクライアントが操作する Service Bus リソースを取得します。

Returns:

このクライアントが操作する Service Bus リソース。

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

接続が関連付けられている完全修飾 Service Bus 名前空間を取得します。 これは、 と似ている可能性があります {yournamespace}.servicebus.windows.net

Returns:

接続が関連付けられている完全修飾 Service Bus 名前空間。

getIdentifier

public String getIdentifier()

のインスタンス ServiceBusReceiverAsyncClientの識別子を取得します。

Returns:

のインスタンス ServiceBusReceiverAsyncClientを識別できる識別子。

getSessionId

public String getSessionId()

このレシーバーがセッション レシーバーの場合は、セッションの SessionId を取得します。

Returns:

セッション レシーバーでない場合は、SessionId または null。

getSessionState

public Mono getSessionState()

このレシーバーがセッション レシーバーの場合は、セッションの状態を取得します。

Returns:

セッションの状態、またはセッションに状態が設定されていない場合は空の Mono。

peekMessage

public Mono peekMessage()

受信側またはメッセージ ソースの状態を変更せずに、次のアクティブなメッセージを読み取ります。 を最初に peek() 呼び出すと、この受信側の最初のアクティブ メッセージがフェッチされます。 後続の呼び出しのたびに、エンティティ内の後続のメッセージがフェッチされます。

Returns:

ピークされた ServiceBusReceivedMessage

peekMessage

public Mono peekMessage(long sequenceNumber)

指定したシーケンス番号から、受信側またはメッセージ ソースの状態を変更せずに、アクティブなメッセージの次を読み取ります。

Parameters:

sequenceNumber - メッセージの読み取り元のシーケンス番号。

Returns:

ピークされた ServiceBusReceivedMessage

peekMessages

public Flux peekMessages(int maxMessages)

受信側またはメッセージ ソースの状態を変更せずに、アクティブなメッセージの次のバッチを読み取ります。

Parameters:

maxMessages - メッセージ数。

Returns:

Fluxピークされる の ServiceBusReceivedMessage

peekMessages

public Flux peekMessages(int maxMessages, long sequenceNumber)

指定したシーケンス番号から、受信側またはメッセージ・ソースの状態を変更せずに、アクティブ・メッセージの次のバッチを読み取ります。

Parameters:

maxMessages - メッセージ数。
sequenceNumber - メッセージの読み取りを開始する場所のシーケンス番号。

Returns:

Fluxピークされた の ServiceBusReceivedMessage

receiveDeferredMessage

public Mono receiveDeferredMessage(long sequenceNumber)

遅延 ServiceBusReceivedMessageを受け取ります。 遅延メッセージは、シーケンス番号を使用してのみ受信できます。

Parameters:

sequenceNumber - メッセージの getSequenceNumber()

Returns:

一致する sequenceNumberを含む遅延メッセージ。

receiveDeferredMessages

public Flux receiveDeferredMessages(Iterable sequenceNumbers)

遅延 ServiceBusReceivedMessageのバッチを受け取ります。 遅延メッセージは、シーケンス番号を使用してのみ受信できます。

Parameters:

sequenceNumbers - 遅延メッセージのシーケンス番号。

Returns:

Flux遅延した ServiceBusReceivedMessageの 。

receiveMessages

public Flux receiveMessages()

Service Bus エンティティから のServiceBusReceivedMessage無限ストリームを受信します。 この Flux は、次のいずれかになるまで、Service Bus エンティティからメッセージを継続的に受信します。

  • 受信側が閉じています。
  • Flux のサブスクリプションは破棄されます。
  • ダウンストリーム サブスクライバーからのターミナル信号がアップストリームに伝達されます (つまり、 Flux#take(long)Flux#take(Duration))。
  • AmqpException受信リンクが停止する原因となる が発生します。

クライアントは、その下にある AMQP リンクを使用してメッセージを受信します。現在の AMQP リンクで再トリable エラーが発生した場合、クライアントは透過的に新しい AMQP リンクに移行します。 クライアントで再試行できないエラーが発生した場合、または再試行が終了すると、サブスクライバーの org.reactivestreams.Subscriber#onError(Throwable) ターミナル ハンドラーにこのエラーが通知されます。 ターミナル イベントの後に org.reactivestreams.Subscriber#onNext(Object) それ以上のメッセージは配信されません。アプリケーションは、受信を再開するために新しいクライアントを作成する必要があります。 古いクライアントの Flux を再サブスクライブしても効果はありません。

注: 再取得できないエラーのいくつかの例は、アプリケーションが存在しないキューへの接続を試みている、受信中にキューを削除または無効にする、ユーザーが Geo-DR を明示的に開始するなどです。 これらは、Service Bus がクライアントと通信して、再取得不可能なエラーが発生した特定のイベントです。

Returns:

Service Bus エンティティからのメッセージの 無限 ストリーム。

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message)

メッセージのロックを非同期的に更新します。 ロックは、エンティティで指定された設定に基づいて更新されます。 メッセージがモードで受信されると、メッセージはこの受信側インスタンスのサーバー上で PEEK_LOCK 、エンティティの作成中に指定された期間ロックされます (LockDuration)。 メッセージの処理にこの期間より長い時間が必要な場合は、ロックを更新する必要があります。 更新ごとに、ロックはエンティティの LockDuration 値にリセットされます。

Parameters:

message - ServiceBusReceivedMessage自動ロックの更新を実行する 。

Returns:

メッセージの新しい有効期限。

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

の自動ロック更新を ServiceBusReceivedMessage開始します。

Parameters:

message - ServiceBusReceivedMessageこの操作を実行する 。
maxLockRenewalDuration - ロック トークンの更新を続ける最大期間。

Returns:

までメッセージの更新操作が完了したときに完了 maxLockRenewalDurationする Mono。

renewSessionLock

public Mono renewSessionLock()

このレシーバーがセッション レシーバーの場合は、セッション ロックを更新します。

Returns:

セッション ロックの次の有効期限。

renewSessionLock

public Mono renewSessionLock(Duration maxLockRenewalDuration)

このレシーバーが動作するセッションの自動ロック更新を開始します。

Parameters:

maxLockRenewalDuration - セッション ロックの更新を続ける最大期間。

Returns:

メッセージのロック更新操作。

rollbackTransaction

public Mono rollbackTransaction(ServiceBusTransactionContext transactionContext)

指定されたトランザクションと、それに関連付けられているすべての操作をロールバックします。

トランザクションの作成と使用

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - ロールバックするトランザクション。

Returns:

Mono Service Bus リソースでこの操作を完了する 。

setSessionState

public Mono setSessionState(byte[] sessionState)

受信側が動作するセッションの状態を設定します。

Parameters:

sessionState - セッションで設定する状態。

Returns:

セッションが設定されたときに完了する Mono

適用対象