ServiceBusClient クラス
ServiceBusClient クラスは、ServiceBusSender と ServiceBusReceiver を取得するための高度なインターフェイスを定義します。
- 継承
-
builtins.objectServiceBusClient
コンストラクター
ServiceBusClient(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any)
パラメーター
- credential
- AsyncTokenCredential または AzureSasCredential または AzureNamedKeyCredential
トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れます。または、AzureSasCredential も指定できます。
- logging_enable
- bool
ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。
- transport_type
- TransportType
Service Bus サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。
- http_proxy
- Dict
HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。 さらに、次のキーが存在する場合もあります: 'username'、'password'。
- user_agent
- str
指定した場合、これは組み込みのユーザー エージェント文字列の前に追加されます。
- retry_total
- int
エラーが発生した場合に失敗した操作をやり直す試行の合計数。 既定値は 3 です。
- retry_backoff_factor
- float
再試行の間の秒単位の内部差分バックオフ。 既定値は 0.8 です。
- retry_backoff_max
- float
2 番目の単位の最大バックオフ間隔。 既定値は 120 です。
- retry_mode
- str
再試行の間の遅延動作。 サポートされている値は "固定" または "指数" で、既定値は "指数" です。
- custom_endpoint_address
- str
Service Bus サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要なアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。
- connection_verify
- str
接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。
- uamqp_transport
- bool
基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、Pure Python AMQP ライブラリが基になるトランスポートとして使用されます。
例
ServiceBusClient の新しいインスタンスを作成します。
import os
from azure.identity.aio import DefaultAzureCredential
from azure.servicebus.aio import ServiceBusClient
fully_qualified_namespace = os.environ['SERVICEBUS_FULLY_QUALIFIED_NAMESPACE']
servicebus_client = ServiceBusClient(
fully_qualified_namespace=fully_qualified_namespace,
credential=DefaultAzureCredential()
)
変数
- fully_qualified_namespace
- str
Service Bus 名前空間の完全修飾ホスト名。 名前空間の形式は .servicebus.windows.net です。
メソッド
close |
ServiceBus クライアントを閉じます。 生成されたすべての送信者、受信側、および基になる接続がシャットダウンされます。 |
from_connection_string |
接続文字列から ServiceBusClient を作成します。 |
get_queue_receiver |
特定のキューの ServiceBusReceiver を取得します。 |
get_queue_sender |
特定のキューの ServiceBusSender を取得します。 |
get_subscription_receiver |
トピックの特定のサブスクリプションの ServiceBusReceiver を取得します。 |
get_topic_sender |
特定のトピックの ServiceBusSender を取得します。 |
close
ServiceBus クライアントを閉じます。 生成されたすべての送信者、受信側、および基になる接続がシャットダウンされます。
async close() -> None
戻り値
なし
from_connection_string
接続文字列から ServiceBusClient を作成します。
from_connection_string(conn_str: str, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any) -> ServiceBusClient
パラメーター
- logging_enable
- bool
ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。
- transport_type
- TransportType
Service Bus サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。
- http_proxy
- Dict
HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。 さらに、次のキーが存在する場合もあります: 'username'、'password'。
- user_agent
- str
指定した場合、これは組み込みのユーザー エージェント文字列の前に追加されます。
- retry_total
- int
エラーが発生した場合に失敗した操作をやり直す試行の合計数。 既定値は 3 です。
- retry_backoff_factor
- float
再試行の間の秒単位の内部差分バックオフ。 既定値は 0.8 です。
- retry_backoff_max
- float
2 番目の単位の最大バックオフ間隔。 既定値は 120 です。
- retry_mode
- str
再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' で、既定値は 'exponential' です。
- custom_endpoint_address
- str
Service Bus サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要なアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。
- connection_verify
- str
接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。
- uamqp_transport
- bool
基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、Pure Python AMQP ライブラリが基になるトランスポートとして使用されます。
の戻り値の型 :
例
接続文字列から ServiceBusClient の新しいインスタンスを作成します。
import os
from azure.servicebus.aio import ServiceBusClient
servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
get_queue_receiver
特定のキューの ServiceBusReceiver を取得します。
get_queue_receiver(queue_name: str, *, session_id: str | ~typing.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>] | None = None, sub_queue: ~azure.servicebus._common.constants.ServiceBusSubQueue | str | None = None, receive_mode: ~azure.servicebus._common.constants.ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: ~azure.servicebus._common.auto_lock_renewer.AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: ~typing.Any) -> ServiceBusReceiver
パラメーター
受信元の特定のセッション。 これはセッションフル キューに対して指定する必要があります。それ以外の場合は None にする必要があります。 次に使用可能なセッションからメッセージを受信するには、これを ~azure.servicebus.NEXT_AVAILABLE_SESSION に設定します。
- sub_queue
- str または ServiceBusSubQueue または None
指定した場合、このレシーバーが接続するサブキュー。 これには、DEAD_LETTERキューとTRANSFER_DEAD_LETTERキューが含まれます。処理できない受信者またはメッセージに配信できないメッセージを保持します。 既定値は None です。つまり、プライマリ キューへの接続を意味します。 ServiceBusSubQueue 列挙型または同等の文字列値 "deadletter" と "transferdeadletter" から値を割り当てることができます。
- receive_mode
- Union[ServiceBusReceiveMode, str]
エンティティからメッセージを取得するモード。 PEEK_LOCKとRECEIVE_AND_DELETEの 2 つのオプションがあります。 PEEK_LOCKで受信したメッセージは、キューから削除される前に、特定のロック期間内に解決する必要があります。 RECEIVE_AND_DELETEで受信したメッセージはキューから直ちに削除され、クライアントがメッセージの処理に失敗した場合は、その後拒否または再受信することはできません。 既定のモードはPEEK_LOCK。
受信したメッセージの間のタイムアウト (秒単位)。その後、受信側は自動的に受信を停止します。 既定値は None で、タイムアウトがないことを意味します。 書き込みタイムアウトが原因で接続エラーが発生している場合は、接続タイムアウト値の調整が必要になる場合があります。 詳細については、 省略可能なパラメーター socket_timeout 参照してください。
- auto_lock_renewer
- Optional[AutoLockRenewer]
メッセージが受信時に自動的に登録されるように、~azure.servicebus.aio.AutoLockRenewer を指定できます。 受信側がセッション レシーバーの場合は、代わりにセッションに適用されます。
- prefetch_count
- int
サービスへの各要求でキャッシュするメッセージの最大数。 この設定は、高度なパフォーマンス チューニングのみを目的としています。 この値を大きくすると、メッセージスループットのパフォーマンスが向上しますが、十分に高速に処理されていない場合は、メッセージがキャッシュされている間にメッセージが期限切れになる可能性が高くなります。 既定値は 0 です。つまり、メッセージはサービスから受信され、一度に 1 つずつ処理されます。 prefetch_countが 0 の場合、 ServiceBusReceiver.receive はサービスへの要求内 でmax_message_count (指定されている場合) をキャッシュしようとします。
- client_identifier
- str
受信側インスタンスを一意に識別する文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、一部のエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。
- socket_timeout
- float
データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要があります。
の戻り値の型 :
例
ServiceBusClient から ServiceBusSender の新しいインスタンスを作成します。
import os
from azure.servicebus.aio import ServiceBusClient
servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
queue_name = os.environ['SERVICEBUS_QUEUE_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
async with servicebus_client:
queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)
get_queue_sender
特定のキューの ServiceBusSender を取得します。
get_queue_sender(queue_name: str, **kwargs: Any) -> ServiceBusSender
パラメーター
- client_identifier
- str
送信者インスタンスを一意に識別する文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、一部のエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。
- socket_timeout
- float
データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要があります。
戻り値
キューの送信者。
の戻り値の型 :
例
接続文字列から ServiceBusClient の新しいインスタンスを作成します。
import os
from azure.servicebus.aio import ServiceBusClient
servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
queue_name = os.environ['SERVICEBUS_QUEUE_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
async with servicebus_client:
queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)
get_subscription_receiver
トピックの特定のサブスクリプションの ServiceBusReceiver を取得します。
get_subscription_receiver(topic_name: str, subscription_name: str, *, session_id: str | ~typing.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>] | None = None, sub_queue: ~azure.servicebus._common.constants.ServiceBusSubQueue | str | None = None, receive_mode: ~azure.servicebus._common.constants.ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: ~azure.servicebus._common.auto_lock_renewer.AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: ~typing.Any) -> ServiceBusReceiver
パラメーター
受信元の特定のセッション。 これはセッションフル サブスクリプションに対して指定する必要があります。それ以外の場合は None にする必要があります。 次に使用可能なセッションからメッセージを受信するには、これを ~azure.servicebus.NEXT_AVAILABLE_SESSION に設定します。
- sub_queue
- str または ServiceBusSubQueue または None
指定した場合、このレシーバーが接続するサブキュー。 これには、DEAD_LETTERキューとTRANSFER_DEAD_LETTERキューが含まれます。処理できない受信者またはメッセージに配信できないメッセージを保持します。 既定値は None です。つまり、プライマリ キューへの接続を意味します。 ServiceBusSubQueue 列挙型または同等の文字列値 "deadletter" と "transferdeadletter" から値を割り当てることができます。
- receive_mode
- Union[ServiceBusReceiveMode, str]
エンティティからメッセージを取得するモード。 PEEK_LOCKとRECEIVE_AND_DELETEの 2 つのオプションがあります。 PEEK_LOCKで受信したメッセージは、サブスクリプションから削除される前に、特定のロック期間内に解決する必要があります。 RECEIVE_AND_DELETEで受信したメッセージはサブスクリプションから直ちに削除され、クライアントがメッセージの処理に失敗した場合は、その後拒否または再受信することはできません。 既定のモードはPEEK_LOCK。
受信したメッセージの間のタイムアウト (秒単位)。その後、受信側は自動的に受信を停止します。 既定値は None で、タイムアウトがないことを意味します。 書き込みタイムアウトが原因で接続エラーが発生している場合は、接続タイムアウト値の調整が必要になる場合があります。 詳細については、 省略可能なパラメーター socket_timeout 参照してください。
- auto_lock_renewer
- Optional[AutoLockRenewer]
メッセージが受信時に自動的に登録されるように、~azure.servicebus.aio.AutoLockRenewer を指定できます。 受信側がセッション レシーバーの場合は、代わりにセッションに適用されます。
- prefetch_count
- int
サービスへの各要求でキャッシュするメッセージの最大数。 この設定は、高度なパフォーマンス チューニングのみを目的としています。 この値を大きくすると、メッセージスループットのパフォーマンスが向上しますが、十分に高速に処理されていない場合は、メッセージがキャッシュされている間にメッセージが期限切れになる可能性が高くなります。 既定値は 0 です。つまり、メッセージはサービスから受信され、一度に 1 つずつ処理されます。 prefetch_countが 0 の場合、 ServiceBusReceiver.receive はサービスへの要求内 でmax_message_count (指定されている場合) をキャッシュしようとします。
- client_identifier
- str
受信側インスタンスを一意に識別する文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、一部のエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。
- socket_timeout
- float
データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要があります。
の戻り値の型 :
例
ServiceBusClient から ServiceBusReceiver の新しいインスタンスを作成します。
import os
from azure.servicebus.aio import ServiceBusClient
servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
subscription_name = os.environ["SERVICEBUS_SUBSCRIPTION_NAME"]
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
async with servicebus_client:
subscription_receiver = servicebus_client.get_subscription_receiver(
topic_name=topic_name,
subscription_name=subscription_name,
)
get_topic_sender
特定のトピックの ServiceBusSender を取得します。
get_topic_sender(topic_name: str, **kwargs: Any) -> ServiceBusSender
パラメーター
- client_identifier
- str
送信者インスタンスを一意に識別する文字列ベースの識別子。 Service Bus は、エラーの関連付けを容易にするために、一部のエラー メッセージに関連付けます。 指定しない場合は、一意の ID が生成されます。
- socket_timeout
- float
データの送受信時に接続の基になるソケットがタイムアウトするまでに待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で接続エラーが発生している場合は、既定値より大きい値を渡す必要があります。
戻り値
トピックの送信者。
の戻り値の型 :
例
ServiceBusClient から ServiceBusSender の新しいインスタンスを作成します。
import os
from azure.servicebus.aio import ServiceBusClient
servicebus_connection_str = os.environ['SERVICEBUS_CONNECTION_STR']
topic_name = os.environ['SERVICEBUS_TOPIC_NAME']
servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
async with servicebus_client:
topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)