你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
ServiceBusReceiver Class
The ServiceBusReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription.
The two primary channels for message receipt are receive() to make a single request for messages, and async for message in receiver: to continuously receive incoming messages in an ongoing fashion.
Please use the get_<queue/subscription>_receiver method of ~azure.servicebus.aio.ServiceBusClient to create a ServiceBusReceiver instance.
- Inheritance
-
ServiceBusReceiverazure.servicebus.aio._base_handler_async.BaseHandlerServiceBusReceiverazure.servicebus._common.receiver_mixins.ReceiverMixinServiceBusReceiver
Constructor
ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)
Parameters
Name | Description |
---|---|
fully_qualified_namespace
Required
|
The fully qualified host name for the Service Bus namespace. The namespace format is: .servicebus.windows.net. |
credential
Required
|
The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the *get_token(self, scopes) method, or alternatively, an AzureSasCredential can be provided too. |
Keyword-Only Parameters
Name | Description |
---|---|
queue_name
|
The path of specific Service Bus Queue the client connects to. |
topic_name
|
The path of specific Service Bus Topic which contains the Subscription the client connects to. |
subscription_name
|
The path of specific Service Bus Subscription under the specified Topic the client connects to. |
receive_mode
|
The mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE will be immediately removed from the queue, and cannot be subsequently abandoned or re-received if the client fails to process the message. The default mode is PEEK_LOCK. Default value: ServiceBusReceiveMode.PEEK_LOCK
|
max_wait_time
|
The timeout in seconds to wait for the first and subsequent messages to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. The default value is None, meaning no timeout. On a sessionful queue/topic when NEXT_AVAILABLE_SESSION is specified, this will act as the timeout for connecting to a session. If connection errors are occurring due to write timing out,the connection timeout value may need to be adjusted. See the socket_timeout optional parameter for more details. |
logging_enable
|
Whether to output network trace logs to the logger. Default is False. |
transport_type
|
The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp. |
http_proxy
|
HTTP proxy settings. This must be a dictionary with the following keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). Additionally the following keys may also be present: 'username', 'password'. |
user_agent
|
If specified, this will be added in front of the built-in user agent string. |
auto_lock_renewer
|
An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead. |
prefetch_count
|
The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive_messages would try to cache max_message_count (if provided) within its request to the service. WARNING: If prefetch_count > 0 and RECEIVE_AND_DELETE mode is used, all prefetched messages will stay in the in-memory prefetch buffer until they're received into the application. If the application ends before the messages are received into the application, those messages will be lost and unable to be recovered. Therefore, it's recommended that PEEK_LOCK mode be used with prefetch. |
client_identifier
|
A string-based identifier to uniquely identify the client instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated. |
socket_timeout
|
The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in. |
Variables
Name | Description |
---|---|
fully_qualified_namespace
|
The fully qualified host name for the Service Bus namespace. The namespace format is: .servicebus.windows.net. |
entity_path
|
The path of the entity that the client connects to. |
Methods
abandon_message |
Abandon the message. This message will be returned to the queue and made available to be received again. |
close | |
complete_message |
Complete the message. This removes the message from the queue. |
dead_letter_message |
Move the message to the Dead Letter queue. The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue. |
defer_message |
Defers the message. This message will remain in the queue but must be requested specifically by its sequence number in order to be received. |
peek_messages |
Browse messages currently pending in the queue. Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered. |
receive_deferred_messages |
Receive messages that have previously been deferred. When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition. |
receive_messages |
Receive a batch of messages at once. This approach is optimal if you wish to process multiple messages simultaneously, or perform an ad-hoc receive as a single call. Note that the number of messages retrieved in a single batch will be dependent on whether prefetch_count was set for the receiver. If prefetch_count is not set for the receiver, the receiver would try to cache max_message_count (if provided) messages within the request to the service. This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size. |
renew_message_lock |
Renew the message lock. This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed. In order to complete (or otherwise settle) the message, the lock must be maintained, and cannot already have expired; an expired lock cannot be renewed. Messages received via RECEIVE_AND_DELETE mode are not locked, and therefore cannot be renewed. This operation is only available for non-sessionful messages as well. |
abandon_message
Abandon the message.
This message will be returned to the queue and made available to be received again.
async abandon_message(message: ServiceBusReceivedMessage) -> None
Parameters
Name | Description |
---|---|
message
Required
|
The received message to be abandoned. |
Returns
Type | Description |
---|---|
Exceptions
Type | Description |
---|---|
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
|
|
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
|
|
azure.servicebus.exceptions.ServiceBusError when errors happen.
|
Examples
Abandon a received message.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.abandon_message(message)
close
async close() -> None
Exceptions
Type | Description |
---|---|
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
|
|
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
|
|
azure.servicebus.exceptions.ServiceBusError when errors happen.
|
complete_message
Complete the message.
This removes the message from the queue.
async complete_message(message: ServiceBusReceivedMessage) -> None
Parameters
Name | Description |
---|---|
message
Required
|
The received message to be completed. |
Returns
Type | Description |
---|---|
Exceptions
Type | Description |
---|---|
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
|
|
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
|
|
azure.servicebus.exceptions.ServiceBusError when errors happen.
|
Examples
Complete a received message.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.complete_message(message)
dead_letter_message
Move the message to the Dead Letter queue.
The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue.
async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None
Parameters
Name | Description |
---|---|
message
Required
|
The received message to be dead-lettered. |
reason
Required
|
The reason for dead-lettering the message. Default value: None
|
error_description
Required
|
The detailed error description for dead-lettering the message. Default value: None
|
Returns
Type | Description |
---|---|
Exceptions
Type | Description |
---|---|
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
|
|
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
|
|
azure.servicebus.exceptions.ServiceBusError when errors happen.
|
Examples
Dead letter a received message.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.dead_letter_message(message)
defer_message
Defers the message.
This message will remain in the queue but must be requested specifically by its sequence number in order to be received.
async defer_message(message: ServiceBusReceivedMessage) -> None
Parameters
Name | Description |
---|---|
message
Required
|
The received message to be deferred. |
Returns
Type | Description |
---|---|
Exceptions
Type | Description |
---|---|
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
|
|
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
|
|
azure.servicebus.exceptions.ServiceBusError when errors happen.
|
Examples
Defer a received message.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.defer_message(message)
peek_messages
Browse messages currently pending in the queue.
Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered.
async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
Parameters
Name | Description |
---|---|
max_message_count
|
The maximum number of messages to try and peek. The default value is 1. Default value: 1
|
Keyword-Only Parameters
Name | Description |
---|---|
sequence_number
|
A message sequence number from which to start browsing messages. |
timeout
|
The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout. |
Returns
Type | Description |
---|---|
A list of ~azure.servicebus.ServiceBusReceivedMessage objects. |
Exceptions
Type | Description |
---|---|
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
|
|
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
|
|
azure.servicebus.exceptions.ServiceBusError when errors happen.
|
Examples
Peek messages in the queue.
async with servicebus_receiver:
messages = await servicebus_receiver.peek_messages()
for message in messages:
print(str(message))
receive_deferred_messages
Receive messages that have previously been deferred.
When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition.
async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
Parameters
Name | Description |
---|---|
sequence_numbers
Required
|
A list of the sequence numbers of messages that have been deferred. |
Keyword-Only Parameters
Name | Description |
---|---|
timeout
|
The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout. |
Returns
Type | Description |
---|---|
A list of the received messages. |
Exceptions
Type | Description |
---|---|
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
|
|
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
|
|
azure.servicebus.exceptions.ServiceBusError when errors happen.
|
Examples
Receive deferred messages from ServiceBus.
async with servicebus_receiver:
deferred_sequenced_numbers = []
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
deferred_sequenced_numbers.append(message.sequence_number)
print(str(message))
await servicebus_receiver.defer_message(message)
received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
sequence_numbers=deferred_sequenced_numbers
)
for message in received_deferred_msg:
await servicebus_receiver.complete_message(message)
receive_messages
Receive a batch of messages at once.
This approach is optimal if you wish to process multiple messages simultaneously, or perform an ad-hoc receive as a single call.
Note that the number of messages retrieved in a single batch will be dependent on whether prefetch_count was set for the receiver. If prefetch_count is not set for the receiver, the receiver would try to cache max_message_count (if provided) messages within the request to the service.
This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size.
async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]
Parameters
Name | Description |
---|---|
max_message_count
|
Maximum number of messages in the batch. Actual number returned will depend on prefetch_count size and incoming stream rate. Setting to None will fully depend on the prefetch config. The default value is 1. Default value: 1
|
max_wait_time
|
Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, and no messages arrive within the timeout period, an empty list will be returned. NOTE: Setting max_wait_time on receive_messages when NEXT_AVAILABLE_SESSION is specified will not impact the timeout for connecting to a session. Please use max_wait_time on the constructor to set the timeout for connecting to a session. Default value: None
|
Returns
Type | Description |
---|---|
A list of messages received. If no messages are available, this will be an empty list. |
Exceptions
Type | Description |
---|---|
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
|
|
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
|
|
azure.servicebus.exceptions.ServiceBusError when errors happen.
|
Examples
Receive messages from ServiceBus.
async with servicebus_receiver:
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
print(str(message))
await servicebus_receiver.complete_message(message)
renew_message_lock
Renew the message lock.
This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed.
In order to complete (or otherwise settle) the message, the lock must be maintained, and cannot already have expired; an expired lock cannot be renewed.
Messages received via RECEIVE_AND_DELETE mode are not locked, and therefore cannot be renewed. This operation is only available for non-sessionful messages as well.
async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime
Parameters
Name | Description |
---|---|
message
Required
|
The message to renew the lock for. |
Keyword-Only Parameters
Name | Description |
---|---|
timeout
|
The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout. |
Returns
Type | Description |
---|---|
The utc datetime the lock is set to expire at. |
Exceptions
Type | Description |
---|---|
TypeError if the message is sessionful.
|
|
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
|
|
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.
|
Examples
Renew the lock on a received message.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.renew_message_lock(message)
Attributes
client_identifier
Get the ServiceBusReceiver client identifier associated with the receiver instance.
Returns
Type | Description |
---|---|
session
Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled entities, it would return None if called on a non-sessionful receiver.
Returns
Type | Description |
---|---|
Examples
Get session from a receiver
async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
session = receiver.session