Dela via


ServiceBusClient Class

The ServiceBusClient class defines a high level interface for getting ServiceBusSender and ServiceBusReceiver.

Inheritance
builtins.object
ServiceBusClient

Constructor

ServiceBusClient(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any)

Parameters

Name Description
fully_qualified_namespace
Required
str

The fully qualified host name for the Service Bus namespace. The namespace format is: .servicebus.windows.net.

credential
Required

The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the *get_token(self, scopes) method, or alternatively, an AzureSasCredential can be provided too.

Keyword-Only Parameters

Name Description
logging_enable

Whether to output network trace logs to the logger. Default is False.

transport_type

The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.

http_proxy

HTTP proxy settings. This must be a dictionary with the following keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). Additionally the following keys may also be present: 'username', 'password'.

user_agent
str

If specified, this will be added in front of the built-in user agent string.

retry_total
int

The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

Default value: 3
retry_backoff_factor

Delta back-off internal in the unit of second between retries. Default value is 0.8.

Default value: 0.8
retry_backoff_max

Maximum back-off interval in the unit of second. Default value is 120.

Default value: 120
retry_mode
str

The delay behavior between retry attempts. Supported values are "fixed" or "exponential", where default is "exponential".

Default value: exponential
custom_endpoint_address
str

The custom endpoint address to use for establishing a connection to the Service Bus service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>". If port is not specified in the custom_endpoint_address, by default port 443 will be used.

connection_verify
str

Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.

ssl_context

The SSLContext object to use in the underlying Pure Python AMQP transport. If specified, connection_verify will be ignored.

uamqp_transport

Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.

Examples

Create a new instance of the ServiceBusClient.


   import os
   from azure.identity.aio import DefaultAzureCredential
   from azure.servicebus.aio import ServiceBusClient

   fully_qualified_namespace = os.environ["SERVICEBUS_FULLY_QUALIFIED_NAMESPACE"]
   servicebus_client = ServiceBusClient(
       fully_qualified_namespace=fully_qualified_namespace, credential=DefaultAzureCredential()
   )

Variables

Name Description
fully_qualified_namespace
str

The fully qualified host name for the Service Bus namespace. The namespace format is: .servicebus.windows.net.

Methods

close

Close down the ServiceBus client. All spawned senders, receivers and underlying connection will be shutdown.

from_connection_string

Create a ServiceBusClient from a connection string.

get_queue_receiver

Get ServiceBusReceiver for the specific queue.

get_queue_sender

Get ServiceBusSender for the specific queue.

get_subscription_receiver

Get ServiceBusReceiver for the specific subscription under the topic.

get_topic_sender

Get ServiceBusSender for the specific topic.

close

Close down the ServiceBus client. All spawned senders, receivers and underlying connection will be shutdown.

async close() -> None

Returns

Type Description

None

from_connection_string

Create a ServiceBusClient from a connection string.

from_connection_string(conn_str: str, *, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: str = 'exponential', **kwargs: Any) -> ServiceBusClient

Parameters

Name Description
conn_str
Required
str

The connection string of a Service Bus.

Keyword-Only Parameters

Name Description
logging_enable

Whether to output network trace logs to the logger. Default is False.

transport_type

The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.

http_proxy

HTTP proxy settings. This must be a dictionary with the following keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). Additionally the following keys may also be present: 'username', 'password'.

user_agent
str

If specified, this will be added in front of the built-in user agent string.

retry_total
int

The total number of attempts to redo a failed operation when an error occurs. Default value is 3.

Default value: 3
retry_backoff_factor

Delta back-off internal in the unit of second between retries. Default value is 0.8.

Default value: 0.8
retry_backoff_max

Maximum back-off interval in the unit of second. Default value is 120.

Default value: 120
retry_mode
str

The delay behavior between retry attempts. Supported values are 'fixed' or 'exponential', where default is 'exponential'.

Default value: exponential
custom_endpoint_address
str

The custom endpoint address to use for establishing a connection to the Service Bus service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>". If port is not specified in the custom_endpoint_address, by default port 443 will be used.

connection_verify
str

Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.

ssl_context

The SSLContext object to use in the underlying Pure Python AMQP transport. If specified, connection_verify will be ignored.

uamqp_transport

Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.

Returns

Type Description

The ServiceBusCLient instance.

Examples

Create a new instance of the ServiceBusClient from connection string.


   import os
   from azure.servicebus.aio import ServiceBusClient

   servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)

get_queue_receiver

Get ServiceBusReceiver for the specific queue.

get_queue_receiver(queue_name: str, *, session_id: str | ~typing.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>] | None = None, sub_queue: ~azure.servicebus._common.constants.ServiceBusSubQueue | str | None = None, receive_mode: ~azure.servicebus._common.constants.ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: ~azure.servicebus._common.auto_lock_renewer.AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: ~typing.Any) -> ServiceBusReceiver

Parameters

Name Description
queue_name
Required
str

The path of specific Service Bus Queue the client connects to.

Keyword-Only Parameters

Name Description
session_id
str or <xref:azure.servicebus.NEXT_AVAILABLE_SESSION> or None

A specific session from which to receive. This must be specified for a sessionful queue, otherwise it must be None. In order to receive messages from the next available session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.

sub_queue

If specified, the subqueue this receiver will connect to. This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can't be delivered to any receiver or messages that can't be processed. The default is None, meaning connect to the primary queue. Can be assigned values from ServiceBusSubQueue enum or equivalent string values "deadletter" and "transferdeadletter".

receive_mode

The mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE will be immediately removed from the queue, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PEEK_LOCK.

Default value: ServiceBusReceiveMode.PEEK_LOCK
max_wait_time

The timeout in seconds to wait for the first and subsequent messages to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. The default value is None, meaning no timeout. On a sessionful queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting. If connection errors are occurring due to write timing out,the connection timeout value may need to be adjusted. See the socket_timeout optional parameter for more details.

auto_lock_renewer

An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead.

prefetch_count
int

The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive_messages would try to cache max_message_count (if provided) within its request to the service. WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all prefetched messages will stay in the in-memory prefetch buffer until they're received into the application. If the application ends before the messages are received into the application, those messages will be lost and unable to be recovered. Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.

client_identifier
str

A string-based identifier to uniquely identify the receiver instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

socket_timeout

The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

Returns

Type Description

The ServiceBusReceiver for the queue.

Examples

Create a new instance of the ServiceBusSender from ServiceBusClient.


   import os
   from azure.servicebus.aio import ServiceBusClient

   servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
   queue_name = os.environ["SERVICEBUS_QUEUE_NAME"]
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   async with servicebus_client:
       queue_receiver = servicebus_client.get_queue_receiver(queue_name=queue_name)

get_queue_sender

Get ServiceBusSender for the specific queue.

get_queue_sender(queue_name: str, **kwargs: Any) -> ServiceBusSender

Parameters

Name Description
queue_name
Required
str

The path of specific Service Bus Queue the client connects to.

Keyword-Only Parameters

Name Description
client_identifier
str

A string-based identifier to uniquely identify the sender instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

socket_timeout

The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

Returns

Type Description

A queue sender.

Examples

Create a new instance of the ServiceBusClient from connection string.


   import os
   from azure.servicebus.aio import ServiceBusClient

   servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
   queue_name = os.environ["SERVICEBUS_QUEUE_NAME"]
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   async with servicebus_client:
       queue_sender = servicebus_client.get_queue_sender(queue_name=queue_name)

get_subscription_receiver

Get ServiceBusReceiver for the specific subscription under the topic.

get_subscription_receiver(topic_name: str, subscription_name: str, *, session_id: str | ~typing.Literal[<ServiceBusSessionFilter.NEXT_AVAILABLE: 0>] | None = None, sub_queue: ~azure.servicebus._common.constants.ServiceBusSubQueue | str | None = None, receive_mode: ~azure.servicebus._common.constants.ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: ~azure.servicebus._common.auto_lock_renewer.AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: ~typing.Any) -> ServiceBusReceiver

Parameters

Name Description
topic_name
Required
str

The name of specific Service Bus Topic the client connects to.

subscription_name
Required
str

The name of specific Service Bus Subscription under the given Service Bus Topic.

Keyword-Only Parameters

Name Description
session_id
str or <xref:azure.servicebus.NEXT_AVAILABLE_SESSION> or None

A specific session from which to receive. This must be specified for a sessionful subscription, otherwise it must be None. In order to receive messages from the next available session, set this to ~azure.servicebus.NEXT_AVAILABLE_SESSION.

sub_queue

If specified, the subqueue this receiver will connect to. This includes the DEAD_LETTER and TRANSFER_DEAD_LETTER queues, holds messages that can't be delivered to any receiver or messages that can't be processed. The default is None, meaning connect to the primary queue. Can be assigned values from ServiceBusSubQueue enum or equivalent string values "deadletter" and "transferdeadletter".

receive_mode

The mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the subscription. Messages received with RECEIVE_AND_DELETE will be immediately removed from the subscription, and cannot be subsequently rejected or re-received if the client fails to process the message. The default mode is PEEK_LOCK.

Default value: ServiceBusReceiveMode.PEEK_LOCK
max_wait_time

The timeout in seconds to wait for the first and subsequent messages to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. The default value is None, meaning no timeout. On a sessionful queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting. If connection errors are occurring due to write timing out,the connection timeout value may need to be adjusted. See the socket_timeout optional parameter for more details.

auto_lock_renewer

An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead.

prefetch_count
int

The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive_messages would try to cache max_message_count (if provided) within its request to the service. WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all prefetched messages will stay in the in-memory prefetch buffer until they're received into the application. If the application ends before the messages are received into the application, those messages will be lost and unable to be recovered. Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.

client_identifier
str

A string-based identifier to uniquely identify the receiver instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

socket_timeout

The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

Returns

Type Description

The ServiceBusReceiver for the topic subscription.

Examples

Create a new instance of the ServiceBusReceiver from ServiceBusClient.


   import os
   from azure.servicebus.aio import ServiceBusClient

   servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
   topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
   subscription_name = os.environ["SERVICEBUS_SUBSCRIPTION_NAME"]
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   async with servicebus_client:
       subscription_receiver = servicebus_client.get_subscription_receiver(
           topic_name=topic_name,
           subscription_name=subscription_name,
       )

get_topic_sender

Get ServiceBusSender for the specific topic.

get_topic_sender(topic_name: str, **kwargs: Any) -> ServiceBusSender

Parameters

Name Description
topic_name
Required
str

The path of specific Service Bus Topic the client connects to.

Keyword-Only Parameters

Name Description
client_identifier
str

A string-based identifier to uniquely identify the sender instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

socket_timeout

The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

Returns

Type Description

A topic sender.

Examples

Create a new instance of the ServiceBusSender from ServiceBusClient.


   import os
   from azure.servicebus.aio import ServiceBusClient

   servicebus_connection_str = os.environ["SERVICEBUS_CONNECTION_STR"]
   topic_name = os.environ["SERVICEBUS_TOPIC_NAME"]
   servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str)
   async with servicebus_client:
       topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name)