次の方法で共有


ServiceBusReceiver クラス

ServiceBusReceiver クラスは、Azure Service Bus キューまたはトピック サブスクリプションからメッセージを受信するための高レベルのインターフェイスを定義します。

メッセージ受信の 2 つのプライマリ チャネルは、メッセージに対して 1 つの要求を行う receive() と、 受信側のメッセージの非同期チャネル です。継続的な方法で受信メッセージを継続的に受信します。

~azure.servicebus.aio.ServiceBusClient のメソッドを使用 get_<queue/subscription>_receiver して、ServiceBusReceiver インスタンスを作成してください。

継承
ServiceBusReceiver
azure.servicebus.aio._base_handler_async.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

コンストラクター

ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

パラメーター

fully_qualified_namespace
str
必須

Service Bus 名前空間の完全修飾ホスト名。 名前空間の形式は .servicebus.windows.net です。

credential
AsyncTokenCredential または AzureSasCredential または AzureNamedKeyCredential
必須

トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れます。または、AzureSasCredential も指定できます。

queue_name
str

クライアントが接続する特定の Service Bus キューのパス。

topic_name
str

クライアントが接続するサブスクリプションを含む特定の Service Bus トピックのパス。

subscription_name
str

クライアントが接続する指定されたトピックの下にある特定の Service Bus サブスクリプションのパス。

receive_mode
Union[ServiceBusReceiveMode, str]

エンティティからメッセージを取得するモード。 2 つのオプションは、PEEK_LOCKとRECEIVE_AND_DELETEです。 PEEK_LOCKで受信したメッセージは、キューから削除される前に、特定のロック期間内に解決する必要があります。 RECEIVE_AND_DELETEで受信したメッセージはすぐにキューから削除され、クライアントがメッセージの処理に失敗した場合は、その後破棄または再受信することはできません。 既定のモードはPEEK_LOCKです。

max_wait_time
Optional[float]

受信したメッセージの間のタイムアウト (秒単位)。その後、受信側は自動的に受信を停止します。 既定値は None で、タイムアウトがないことを意味します。

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

transport_type
TransportType

Service Bus サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です

http_proxy
Dict

HTTP プロキシ設定。 これは、 'proxy_hostname' (str 値) と ' proxy_port' (int 値) のキーを持つディクショナリである必要があります。 さらに、次のキーが存在する場合もあります: 'username'、'password'

user_agent
str

指定した場合、これは組み込みのユーザー エージェント文字列の前に追加されます。

auto_lock_renewer
Optional[AutoLockRenewer]

メッセージが受信時に自動的に登録されるように、~azure.servicebus.aio.AutoLockRenewer を指定できます。 受信側がセッション レシーバーの場合は、代わりにセッションに適用されます。

prefetch_count
int

サービスへの各要求でキャッシュするメッセージの最大数。 この設定は、高度なパフォーマンス チューニングのみを目的としています。 この値を大きくすると、メッセージスループットのパフォーマンスが向上しますが、十分に高速に処理されていない場合は、メッセージがキャッシュされている間にメッセージが期限切れになる可能性が高くなります。 既定値は 0 です。つまり、メッセージはサービスから受信され、一度に 1 つずつ処理されます。 prefetch_countが 0 の場合、 ServiceBusReceiver.receive はサービスへの要求内 でmax_message_count (指定されている場合) をキャッシュしようとします。

client_identifier
str

クライアント インスタンスを一意に識別するための文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、いくつかのエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。

socket_timeout
float

接続の基になるソケットが、データの送受信時にタイムアウトするまで待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要がある場合があります。

変数

fully_qualified_namespace
str

Service Bus 名前空間の完全修飾ホスト名。 名前空間の形式は .servicebus.windows.net です。

entity_path
str

クライアントが接続するエンティティのパス。

メソッド

abandon_message

メッセージを破棄します。

このメッセージはキューに返され、再び受信できるようになります。

close
complete_message

メッセージを完了します。

これにより、キューからメッセージが削除されます。

dead_letter_message

メッセージを配信不能キューに移動します。

配信不能キューは、正しく処理できなかったメッセージを格納するために使用できるサブキューです。それ以外の場合は、さらに検査または処理が必要です。 キューは、期限切れのメッセージを配信不能キューに送信するように構成することもできます。

defer_message

メッセージを延期します。

このメッセージはキューに残りますが、受信するには、そのシーケンス番号で特に要求する必要があります。

peek_messages

キューで現在保留中のメッセージを参照します。

ピークされたメッセージはキューから削除されず、ロックもされません。 完了、遅延、または配信不能にすることはできません。

receive_deferred_messages

以前に延期されたメッセージを受信します。

パーティション分割されたエンティティから遅延メッセージを受信する場合は、指定されたすべてのシーケンス番号が同じパーティションからのメッセージである必要があります。

receive_messages

一度にメッセージのバッチを受信します。

この方法は、複数のメッセージを同時に処理する場合や、アドホック受信を 1 回の呼び出しとして実行する場合に最適です。

1 つのバッチで取得されるメッセージの数は、受信側に prefetch_count が設定されているかどうかによって異なります。 受信側prefetch_count設定されていない場合、受信側はサービスへの要求内でメッセージmax_message_count (指定されている場合) をキャッシュしようとします。

この呼び出しは、指定されたバッチ サイズを満たすよりも迅速に返されることを優先するため、少なくとも 1 つのメッセージが受信され、指定されたバッチ サイズに関係なく受信メッセージにギャップが発生するとすぐにが返されます。

renew_message_lock

メッセージ ロックを更新します。

これにより、メッセージが再処理されるキューに返されないように、メッセージのロックが維持されます。

メッセージを完了 (または解決しない場合) するには、ロックを維持する必要があり、期限切れになっていることはできません。期限切れのロックを更新することはできません。

RECEIVE_AND_DELETE モードで受信したメッセージはロックされないため、更新できません。 この操作は、セッションフルでないメッセージに対してのみ使用できます。

abandon_message

メッセージを破棄します。

このメッセージはキューに返され、再び受信できるようになります。

async abandon_message(message: ServiceBusReceivedMessage) -> None

パラメーター

message
ServiceBusReceivedMessage
必須

破棄されるメッセージを受信しました。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

受信したメッセージを破棄します。


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.abandon_message(message)

close

async close() -> None

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

メッセージを完了します。

これにより、キューからメッセージが削除されます。

async complete_message(message: ServiceBusReceivedMessage) -> None

パラメーター

message
ServiceBusReceivedMessage
必須

完了する受信メッセージ。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

受信したメッセージを完了します。


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.complete_message(message)

dead_letter_message

メッセージを配信不能キューに移動します。

配信不能キューは、正しく処理できなかったメッセージを格納するために使用できるサブキューです。それ以外の場合は、さらに検査または処理が必要です。 キューは、期限切れのメッセージを配信不能キューに送信するように構成することもできます。

async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

パラメーター

message
ServiceBusReceivedMessage
必須

配信不能のメッセージを受信しました。

reason
Optional[str]
既定値: None

メッセージを配信不能にする理由。

error_description
Optional[str]
既定値: None

メッセージの配信不能に関する詳細なエラーの説明。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

受信したメッセージの配信不能。


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.dead_letter_message(message)

defer_message

メッセージを延期します。

このメッセージはキューに残りますが、受信するには、そのシーケンス番号で特に要求する必要があります。

async defer_message(message: ServiceBusReceivedMessage) -> None

パラメーター

message
ServiceBusReceivedMessage
必須

遅延する受信メッセージ。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

受信したメッセージを延期します。


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.defer_message(message)

peek_messages

キューで現在保留中のメッセージを参照します。

ピークされたメッセージはキューから削除されず、ロックもされません。 完了、遅延、または配信不能にすることはできません。

async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

パラメーター

max_message_count
int
既定値: 1

試行してピークするメッセージの最大数。 既定値は 1 です。

sequence_number
int

メッセージの参照を開始するメッセージ シーケンス番号。

timeout
Optional[float]

すべての再試行を含む合計操作タイムアウト (秒単位)。 指定する場合、値は 0 より大きくする必要があります。 既定値は None で、タイムアウトがないことを意味します。

戻り値

~azure.servicebus.ServiceBusReceivedMessage オブジェクトの一覧。

の戻り値の型 :

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

キュー内のメッセージをピークします。


   async with servicebus_receiver:
       messages = await servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

以前に延期されたメッセージを受信します。

パーティション分割されたエンティティから遅延メッセージを受信する場合は、指定されたすべてのシーケンス番号が同じパーティションからのメッセージである必要があります。

async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

パラメーター

sequence_numbers
Union[int, list[int]]
必須

遅延されたメッセージのシーケンス番号の一覧。

timeout
Optional[float]

すべての再試行を含む合計操作タイムアウト (秒単位)。 指定する場合、値は 0 より大きくする必要があります。 既定値は None で、タイムアウトがないことを意味します。

戻り値

受信したメッセージの一覧。

の戻り値の型 :

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

ServiceBus から遅延メッセージを受信します。


   async with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           await servicebus_receiver.defer_message(message)

       received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for message in received_deferred_msg:
           await servicebus_receiver.complete_message(message)

receive_messages

一度にメッセージのバッチを受信します。

この方法は、複数のメッセージを同時に処理する場合や、アドホック受信を 1 回の呼び出しとして実行する場合に最適です。

1 つのバッチで取得されるメッセージの数は、受信側に prefetch_count が設定されているかどうかによって異なります。 受信側prefetch_count設定されていない場合、受信側はサービスへの要求内でメッセージmax_message_count (指定されている場合) をキャッシュしようとします。

この呼び出しは、指定されたバッチ サイズを満たすよりも迅速に返されることを優先するため、少なくとも 1 つのメッセージが受信され、指定されたバッチ サイズに関係なく受信メッセージにギャップが発生するとすぐにが返されます。

async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

パラメーター

max_message_count
Optional[int]
既定値: 1

バッチ内のメッセージの最大数。 返される実際の数は、prefetch_countサイズと受信ストリームレートによって異なります。 [なし] に設定すると、プリフェッチ構成に完全に依存します。既定値は 1 です。

max_wait_time
Optional[float]
既定値: None

最初のメッセージが到着するまでの最大待機時間 (秒単位)。 メッセージが到着せず、タイムアウトが指定されていない場合、この呼び出しは接続が閉じられるまで戻りません。 指定した場合、タイムアウト期間内にメッセージが到着しなかった場合は、空のリストが返されます。

戻り値

受信したメッセージの一覧。 使用可能なメッセージがない場合は、空のリストになります。

の戻り値の型 :

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

ServiceBus からメッセージを受信します。


   async with servicebus_receiver:
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           await servicebus_receiver.complete_message(message)

renew_message_lock

メッセージ ロックを更新します。

これにより、メッセージが再処理されるキューに返されないように、メッセージのロックが維持されます。

メッセージを完了 (または解決しない場合) するには、ロックを維持する必要があり、期限切れになっていることはできません。期限切れのロックを更新することはできません。

RECEIVE_AND_DELETE モードで受信したメッセージはロックされないため、更新できません。 この操作は、セッションフルでないメッセージに対してのみ使用できます。

async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

パラメーター

message
ServiceBusReceivedMessage
必須

ロックを更新するメッセージ。

timeout
Optional[float]

すべての再試行を含む合計操作タイムアウト (秒単位)。 指定する場合、値は 0 より大きくする必要があります。 既定値は None で、タイムアウトがないことを意味します。

戻り値

ロックが期限切れに設定されている utc datetime。

の戻り値の型 :

例外

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

受信したメッセージのロックを更新します。


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.renew_message_lock(message)

属性

client_identifier

受信側インスタンスに関連付けられている ServiceBusReceiver クライアント識別子を取得します。

の戻り値の型 :

str

session

受信側にリンクされている ServiceBusSession オブジェクトを取得します。 セッションはセッションが有効なエンティティでのみ使用できます。セッションが有効でない受信者で呼び出された場合は None が返されます。

の戻り値の型 :

レシーバーからセッションを取得する


       async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session