ServiceBusReceiver クラス
ServiceBusReceiver クラスは、Azure Service Bus キューまたはトピック サブスクリプションからメッセージを受信するための高レベルのインターフェイスを定義します。
メッセージ受信の 2 つのプライマリ チャネルは、メッセージに対して 1 つの要求を行う receive() と、 受信側のメッセージの非同期チャネル です。継続的な方法で受信メッセージを継続的に受信します。
~azure.servicebus.aio.ServiceBusClient のメソッドを使用 get_<queue/subscription>_receiver
して、ServiceBusReceiver インスタンスを作成してください。
- 継承
-
ServiceBusReceiverazure.servicebus.aio._base_handler_async.BaseHandlerServiceBusReceiverazure.servicebus._common.receiver_mixins.ReceiverMixinServiceBusReceiver
コンストラクター
ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)
パラメーター
- credential
- AsyncTokenCredential または AzureSasCredential または AzureNamedKeyCredential
トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れます。または、AzureSasCredential も指定できます。
- queue_name
- str
クライアントが接続する特定の Service Bus キューのパス。
- topic_name
- str
クライアントが接続するサブスクリプションを含む特定の Service Bus トピックのパス。
- subscription_name
- str
クライアントが接続する指定されたトピックの下にある特定の Service Bus サブスクリプションのパス。
- receive_mode
- Union[ServiceBusReceiveMode, str]
エンティティからメッセージを取得するモード。 2 つのオプションは、PEEK_LOCKとRECEIVE_AND_DELETEです。 PEEK_LOCKで受信したメッセージは、キューから削除される前に、特定のロック期間内に解決する必要があります。 RECEIVE_AND_DELETEで受信したメッセージはすぐにキューから削除され、クライアントがメッセージの処理に失敗した場合は、その後破棄または再受信することはできません。 既定のモードはPEEK_LOCKです。
受信したメッセージの間のタイムアウト (秒単位)。その後、受信側は自動的に受信を停止します。 既定値は None で、タイムアウトがないことを意味します。
- logging_enable
- bool
ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。
- transport_type
- TransportType
Service Bus サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。
- http_proxy
- Dict
HTTP プロキシ設定。 これは、 'proxy_hostname' (str 値) と ' proxy_port' (int 値) のキーを持つディクショナリである必要があります。 さらに、次のキーが存在する場合もあります: 'username'、'password'。
- user_agent
- str
指定した場合、これは組み込みのユーザー エージェント文字列の前に追加されます。
- auto_lock_renewer
- Optional[AutoLockRenewer]
メッセージが受信時に自動的に登録されるように、~azure.servicebus.aio.AutoLockRenewer を指定できます。 受信側がセッション レシーバーの場合は、代わりにセッションに適用されます。
- prefetch_count
- int
サービスへの各要求でキャッシュするメッセージの最大数。 この設定は、高度なパフォーマンス チューニングのみを目的としています。 この値を大きくすると、メッセージスループットのパフォーマンスが向上しますが、十分に高速に処理されていない場合は、メッセージがキャッシュされている間にメッセージが期限切れになる可能性が高くなります。 既定値は 0 です。つまり、メッセージはサービスから受信され、一度に 1 つずつ処理されます。 prefetch_countが 0 の場合、 ServiceBusReceiver.receive はサービスへの要求内 でmax_message_count (指定されている場合) をキャッシュしようとします。
- client_identifier
- str
クライアント インスタンスを一意に識別するための文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、いくつかのエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。
- socket_timeout
- float
接続の基になるソケットが、データの送受信時にタイムアウトするまで待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要がある場合があります。
変数
- fully_qualified_namespace
- str
Service Bus 名前空間の完全修飾ホスト名。 名前空間の形式は .servicebus.windows.net です。
- entity_path
- str
クライアントが接続するエンティティのパス。
メソッド
abandon_message |
メッセージを破棄します。 このメッセージはキューに返され、再び受信できるようになります。 |
close | |
complete_message |
メッセージを完了します。 これにより、キューからメッセージが削除されます。 |
dead_letter_message |
メッセージを配信不能キューに移動します。 配信不能キューは、正しく処理できなかったメッセージを格納するために使用できるサブキューです。それ以外の場合は、さらに検査または処理が必要です。 キューは、期限切れのメッセージを配信不能キューに送信するように構成することもできます。 |
defer_message |
メッセージを延期します。 このメッセージはキューに残りますが、受信するには、そのシーケンス番号で特に要求する必要があります。 |
peek_messages |
キューで現在保留中のメッセージを参照します。 ピークされたメッセージはキューから削除されず、ロックもされません。 完了、遅延、または配信不能にすることはできません。 |
receive_deferred_messages |
以前に延期されたメッセージを受信します。 パーティション分割されたエンティティから遅延メッセージを受信する場合は、指定されたすべてのシーケンス番号が同じパーティションからのメッセージである必要があります。 |
receive_messages |
一度にメッセージのバッチを受信します。 この方法は、複数のメッセージを同時に処理する場合や、アドホック受信を 1 回の呼び出しとして実行する場合に最適です。 1 つのバッチで取得されるメッセージの数は、受信側に prefetch_count が設定されているかどうかによって異なります。 受信側prefetch_count設定されていない場合、受信側はサービスへの要求内でメッセージmax_message_count (指定されている場合) をキャッシュしようとします。 この呼び出しは、指定されたバッチ サイズを満たすよりも迅速に返されることを優先するため、少なくとも 1 つのメッセージが受信され、指定されたバッチ サイズに関係なく受信メッセージにギャップが発生するとすぐにが返されます。 |
renew_message_lock |
メッセージ ロックを更新します。 これにより、メッセージが再処理されるキューに返されないように、メッセージのロックが維持されます。 メッセージを完了 (または解決しない場合) するには、ロックを維持する必要があり、期限切れになっていることはできません。期限切れのロックを更新することはできません。 RECEIVE_AND_DELETE モードで受信したメッセージはロックされないため、更新できません。 この操作は、セッションフルでないメッセージに対してのみ使用できます。 |
abandon_message
メッセージを破棄します。
このメッセージはキューに返され、再び受信できるようになります。
async abandon_message(message: ServiceBusReceivedMessage) -> None
パラメーター
の戻り値の型 :
例外
例
受信したメッセージを破棄します。
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.abandon_message(message)
close
async close() -> None
例外
complete_message
メッセージを完了します。
これにより、キューからメッセージが削除されます。
async complete_message(message: ServiceBusReceivedMessage) -> None
パラメーター
の戻り値の型 :
例外
例
受信したメッセージを完了します。
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.complete_message(message)
dead_letter_message
メッセージを配信不能キューに移動します。
配信不能キューは、正しく処理できなかったメッセージを格納するために使用できるサブキューです。それ以外の場合は、さらに検査または処理が必要です。 キューは、期限切れのメッセージを配信不能キューに送信するように構成することもできます。
async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None
パラメーター
の戻り値の型 :
例外
例
受信したメッセージの配信不能。
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.dead_letter_message(message)
defer_message
メッセージを延期します。
このメッセージはキューに残りますが、受信するには、そのシーケンス番号で特に要求する必要があります。
async defer_message(message: ServiceBusReceivedMessage) -> None
パラメーター
の戻り値の型 :
例外
例
受信したメッセージを延期します。
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.defer_message(message)
peek_messages
キューで現在保留中のメッセージを参照します。
ピークされたメッセージはキューから削除されず、ロックもされません。 完了、遅延、または配信不能にすることはできません。
async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
パラメーター
- sequence_number
- int
メッセージの参照を開始するメッセージ シーケンス番号。
すべての再試行を含む合計操作タイムアウト (秒単位)。 指定する場合、値は 0 より大きくする必要があります。 既定値は None で、タイムアウトがないことを意味します。
戻り値
~azure.servicebus.ServiceBusReceivedMessage オブジェクトの一覧。
の戻り値の型 :
例外
例
キュー内のメッセージをピークします。
async with servicebus_receiver:
messages = await servicebus_receiver.peek_messages()
for message in messages:
print(str(message))
receive_deferred_messages
以前に延期されたメッセージを受信します。
パーティション分割されたエンティティから遅延メッセージを受信する場合は、指定されたすべてのシーケンス番号が同じパーティションからのメッセージである必要があります。
async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
パラメーター
すべての再試行を含む合計操作タイムアウト (秒単位)。 指定する場合、値は 0 より大きくする必要があります。 既定値は None で、タイムアウトがないことを意味します。
戻り値
受信したメッセージの一覧。
の戻り値の型 :
例外
例
ServiceBus から遅延メッセージを受信します。
async with servicebus_receiver:
deferred_sequenced_numbers = []
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
deferred_sequenced_numbers.append(message.sequence_number)
print(str(message))
await servicebus_receiver.defer_message(message)
received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
sequence_numbers=deferred_sequenced_numbers
)
for message in received_deferred_msg:
await servicebus_receiver.complete_message(message)
receive_messages
一度にメッセージのバッチを受信します。
この方法は、複数のメッセージを同時に処理する場合や、アドホック受信を 1 回の呼び出しとして実行する場合に最適です。
1 つのバッチで取得されるメッセージの数は、受信側に prefetch_count が設定されているかどうかによって異なります。 受信側prefetch_count設定されていない場合、受信側はサービスへの要求内でメッセージmax_message_count (指定されている場合) をキャッシュしようとします。
この呼び出しは、指定されたバッチ サイズを満たすよりも迅速に返されることを優先するため、少なくとも 1 つのメッセージが受信され、指定されたバッチ サイズに関係なく受信メッセージにギャップが発生するとすぐにが返されます。
async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]
パラメーター
バッチ内のメッセージの最大数。 返される実際の数は、prefetch_countサイズと受信ストリームレートによって異なります。 [なし] に設定すると、プリフェッチ構成に完全に依存します。既定値は 1 です。
最初のメッセージが到着するまでの最大待機時間 (秒単位)。 メッセージが到着せず、タイムアウトが指定されていない場合、この呼び出しは接続が閉じられるまで戻りません。 指定した場合、タイムアウト期間内にメッセージが到着しなかった場合は、空のリストが返されます。
戻り値
受信したメッセージの一覧。 使用可能なメッセージがない場合は、空のリストになります。
の戻り値の型 :
例外
例
ServiceBus からメッセージを受信します。
async with servicebus_receiver:
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
print(str(message))
await servicebus_receiver.complete_message(message)
renew_message_lock
メッセージ ロックを更新します。
これにより、メッセージが再処理されるキューに返されないように、メッセージのロックが維持されます。
メッセージを完了 (または解決しない場合) するには、ロックを維持する必要があり、期限切れになっていることはできません。期限切れのロックを更新することはできません。
RECEIVE_AND_DELETE モードで受信したメッセージはロックされないため、更新できません。 この操作は、セッションフルでないメッセージに対してのみ使用できます。
async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime
パラメーター
すべての再試行を含む合計操作タイムアウト (秒単位)。 指定する場合、値は 0 より大きくする必要があります。 既定値は None で、タイムアウトがないことを意味します。
戻り値
ロックが期限切れに設定されている utc datetime。
の戻り値の型 :
例外
例
受信したメッセージのロックを更新します。
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.renew_message_lock(message)
属性
client_identifier
session
受信側にリンクされている ServiceBusSession オブジェクトを取得します。 セッションはセッションが有効なエンティティでのみ使用できます。セッションが有効でない受信者で呼び出された場合は None が返されます。
の戻り値の型 :
例
レシーバーからセッションを取得する
async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
session = receiver.session