Compartilhar via


ServiceBusReceiver Class

The ServiceBusReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription.

The two primary channels for message receipt are receive() to make a single request for messages, and async for message in receiver: to continuously receive incoming messages in an ongoing fashion.

Please use the get_<queue/subscription>_receiver method of ~azure.servicebus.aio.ServiceBusClient to create a ServiceBusReceiver instance.

Inheritance
ServiceBusReceiver
azure.servicebus.aio._base_handler_async.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

Constructor

ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

Parameters

Name Description
fully_qualified_namespace
Required
str

The fully qualified host name for the Service Bus namespace. The namespace format is: .servicebus.windows.net.

credential
Required

The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the *get_token(self, scopes) method, or alternatively, an AzureSasCredential can be provided too.

Keyword-Only Parameters

Name Description
queue_name
str

The path of specific Service Bus Queue the client connects to.

topic_name
str

The path of specific Service Bus Topic which contains the Subscription the client connects to.

subscription_name
str

The path of specific Service Bus Subscription under the specified Topic the client connects to.

receive_mode

The mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE will be immediately removed from the queue, and cannot be subsequently abandoned or re-received if the client fails to process the message. The default mode is PEEK_LOCK.

Default value: ServiceBusReceiveMode.PEEK_LOCK
max_wait_time

The timeout in seconds to wait for the first and subsequent messages to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. The default value is None, meaning no timeout. On a sessionful queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting to a session. If connection errors are occurring due to write timing out,the connection timeout value may need to be adjusted. See the socket_timeout optional parameter for more details.

logging_enable

Whether to output network trace logs to the logger. Default is False.

transport_type

The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.

http_proxy

HTTP proxy settings. This must be a dictionary with the following keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). Additionally the following keys may also be present: 'username', 'password'.

user_agent
str

If specified, this will be added in front of the built-in user agent string.

auto_lock_renewer

An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead.

prefetch_count
int

The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive_messages would try to cache max_message_count (if provided) within its request to the service. WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all prefetched messages will stay in the in-memory prefetch buffer until they're received into the application. If the application ends before the messages are received into the application, those messages will be lost and unable to be recovered. Therefore, it's recommended that PEEK_LOCK mode be used with prefetch.

client_identifier
str

A string-based identifier to uniquely identify the client instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated.

socket_timeout

The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in.

Variables

Name Description
fully_qualified_namespace
str

The fully qualified host name for the Service Bus namespace. The namespace format is: .servicebus.windows.net.

entity_path
str

The path of the entity that the client connects to.

Methods

abandon_message

Abandon the message.

This message will be returned to the queue and made available to be received again.

close
complete_message

Complete the message.

This removes the message from the queue.

dead_letter_message

Move the message to the Dead Letter queue.

The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue.

defer_message

Defers the message.

This message will remain in the queue but must be requested specifically by its sequence number in order to be received.

peek_messages

Browse messages currently pending in the queue.

Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered.

receive_deferred_messages

Receive messages that have previously been deferred.

When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition.

receive_messages

Receive a batch of messages at once.

This approach is optimal if you wish to process multiple messages simultaneously, or perform an ad-hoc receive as a single call.

Note that the number of messages retrieved in a single batch will be dependent on whether prefetch_count was set for the receiver. If prefetch_count is not set for the receiver, the receiver would try to cache max_message_count (if provided) messages within the request to the service.

This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size.

renew_message_lock

Renew the message lock.

This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed.

In order to complete (or otherwise settle) the message, the lock must be maintained, and cannot already have expired; an expired lock cannot be renewed.

Messages received via RECEIVE_AND_DELETE mode are not locked, and therefore cannot be renewed. This operation is only available for non-sessionful messages as well.

abandon_message

Abandon the message.

This message will be returned to the queue and made available to be received again.

async abandon_message(message: ServiceBusReceivedMessage) -> None

Parameters

Name Description
message
Required

The received message to be abandoned.

Returns

Type Description

Exceptions

Type Description
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Examples

Abandon a received message.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.abandon_message(message)

close

async close() -> None

Exceptions

Type Description
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

Complete the message.

This removes the message from the queue.

async complete_message(message: ServiceBusReceivedMessage) -> None

Parameters

Name Description
message
Required

The received message to be completed.

Returns

Type Description

Exceptions

Type Description
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Examples

Complete a received message.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.complete_message(message)

dead_letter_message

Move the message to the Dead Letter queue.

The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue.

async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

Parameters

Name Description
message
Required

The received message to be dead-lettered.

reason

The reason for dead-lettering the message.

Default value: None
error_description

The detailed error description for dead-lettering the message.

Default value: None

Returns

Type Description

Exceptions

Type Description
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Examples

Dead letter a received message.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.dead_letter_message(message)

defer_message

Defers the message.

This message will remain in the queue but must be requested specifically by its sequence number in order to be received.

async defer_message(message: ServiceBusReceivedMessage) -> None

Parameters

Name Description
message
Required

The received message to be deferred.

Returns

Type Description

Exceptions

Type Description
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Examples

Defer a received message.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.defer_message(message)

peek_messages

Browse messages currently pending in the queue.

Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered.

async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parameters

Name Description
max_message_count
int

The maximum number of messages to try and peek. The default value is 1.

Default value: 1

Keyword-Only Parameters

Name Description
sequence_number
int

A message sequence number from which to start browsing messages.

timeout

The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Returns

Type Description

A list of ~azure.servicebus.ServiceBusReceivedMessage objects.

Exceptions

Type Description
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Examples

Peek messages in the queue.


   async with servicebus_receiver:
       messages = await servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

Receive messages that have previously been deferred.

When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition.

async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parameters

Name Description
sequence_numbers
Required

A list of the sequence numbers of messages that have been deferred.

Keyword-Only Parameters

Name Description
timeout

The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Returns

Type Description

A list of the received messages.

Exceptions

Type Description
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Examples

Receive deferred messages from ServiceBus.


   async with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           await servicebus_receiver.defer_message(message)

       received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for message in received_deferred_msg:
           await servicebus_receiver.complete_message(message)

receive_messages

Receive a batch of messages at once.

This approach is optimal if you wish to process multiple messages simultaneously, or perform an ad-hoc receive as a single call.

Note that the number of messages retrieved in a single batch will be dependent on whether prefetch_count was set for the receiver. If prefetch_count is not set for the receiver, the receiver would try to cache max_message_count (if provided) messages within the request to the service.

This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size.

async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

Parameters

Name Description
max_message_count

Maximum number of messages in the batch. Actual number returned will depend on prefetch_count size and incoming stream rate. Setting to None will fully depend on the prefetch config. The default value is 1.

Default value: 1
max_wait_time

Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, and no messages arrive within the timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session. Please use max_wait_time on the constructor to set the timeout for connecting to a session.

Default value: None

Returns

Type Description

A list of messages received. If no messages are available, this will be an empty list.

Exceptions

Type Description
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Examples

Receive messages from ServiceBus.


   async with servicebus_receiver:
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           await servicebus_receiver.complete_message(message)

renew_message_lock

Renew the message lock.

This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed.

In order to complete (or otherwise settle) the message, the lock must be maintained, and cannot already have expired; an expired lock cannot be renewed.

Messages received via RECEIVE_AND_DELETE mode are not locked, and therefore cannot be renewed. This operation is only available for non-sessionful messages as well.

async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

Parameters

Name Description
message
Required

The message to renew the lock for.

Keyword-Only Parameters

Name Description
timeout

The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.

Returns

Type Description

The utc datetime the lock is set to expire at.

Exceptions

Type Description
TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

Examples

Renew the lock on a received message.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.renew_message_lock(message)

Attributes

client_identifier

Get the ServiceBusReceiver client identifier associated with the receiver instance.

Returns

Type Description
str

session

Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled entities, it would return None if called on a non-sessionful receiver.

Returns

Type Description

Examples

Get session from a receiver


       async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session