你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
ServiceBusReceiver 类
ServiceBusReceiver 类定义了一个高级接口,用于从Azure 服务总线队列或主题订阅接收消息。
消息接收的两个主要通道是 receive () ,用于对消息发出单个请求, 以及接收方中的消息: 以持续的方式持续接收传入消息。
请使用 get_<queue/subscription>_receiver
~azure.servicebus.ServiceBusClient 方法创建 ServiceBusReceiver 实例。
- 继承
-
azure.servicebus._base_handler.BaseHandlerServiceBusReceiverazure.servicebus._common.receiver_mixins.ReceiverMixinServiceBusReceiver
构造函数
ServiceBusReceiver(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)
参数
- credential
- TokenCredential 或 AzureSasCredential 或 AzureNamedKeyCredential
用于身份验证的凭据对象,该对象实现用于获取令牌的特定接口。 它接受 azure 标识库生成的凭据对象,以及实现 *get_token (self、 scopes) 方法的对象,或者也可以提供 AzureSasCredential。
- queue_name
- str
客户端连接到的特定服务总线队列的路径。
- topic_name
- str
包含客户端连接到的订阅的特定服务总线主题的路径。
- subscription_name
- str
客户端连接到的指定主题下的特定服务总线订阅的路径。
- receive_mode
- Union[ServiceBusReceiveMode, str]
从实体检索消息的模式。 这两个选项是PEEK_LOCK和RECEIVE_AND_DELETE。 使用 PEEK_LOCK 接收的消息必须在给定的锁定期内解决,然后才能从队列中删除消息。 使用 RECEIVE_AND_DELETE 接收的消息将立即从队列中删除,如果客户端无法处理消息,则无法随后放弃或重新接收消息。 默认模式为PEEK_LOCK。
- logging_enable
- bool
是否将网络跟踪日志输出到记录器。 默认值为 False。
- transport_type
- TransportType
将用于与服务总线服务通信的传输协议的类型。 默认值为 TransportType.Amqp。
- http_proxy
- Dict
HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。
- user_agent
- str
如果指定,则会将其添加到内置用户代理字符串的前面。
- auto_lock_renewer
- Optional[AutoLockRenewer]
可以提供 ~azure.servicebus.AutoLockRenewer,以便消息在收到时自动注册。 如果接收方是会话接收器,它将改为应用于会话。
- prefetch_count
- int
每次向服务发出请求时要缓存的最大消息数。 此设置仅用于高级性能优化。 增大此值将提高消息吞吐量性能,但如果处理速度不够快,则会增加消息在缓存时过期的可能性。 默认值为 0,这意味着将从服务接收消息,并一次处理一个消息。 如果prefetch_count为 0,ServiceBusReceiver.receive 将尝试缓存 max_message_count ((如果) 请求中向服务提供)。
- client_identifier
- str
用于唯一标识客户端实例的基于字符串的标识符。 服务总线会将其与一些错误消息相关联,以便更轻松地关联错误。 如果未指定,将生成唯一 ID。
- socket_timeout
- float
在超时之前,连接上的基础套接字在发送和接收数据时应等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果由于写入超时而发生连接错误,则可能需要传入大于默认值的值。
变量
- fully_qualified_namespace
- str
服务总线命名空间的完全限定主机名。 命名空间格式为: .servicebus.windows.net。
- entity_path
- str
客户端连接到的实体的路径。
方法
abandon_message |
放弃消息。 此消息将返回到队列,并可供再次接收。 |
close | |
complete_message |
完成消息。 这会从队列中删除消息。 |
dead_letter_message |
将消息移动到死信队列。 死信队列是一个子队列,可用于存储无法正确处理的消息,或者需要进一步检查或处理的消息。 还可以将队列配置为将过期的消息发送到死信队列。 |
defer_message |
延迟消息。 此消息将保留在队列中,但必须按其序列号专门请求才能接收。 |
next | |
peek_messages |
浏览队列中当前挂起的消息。 不会从队列中删除速览消息,也不会将其锁定。 它们不能完成、延迟或死信。 |
receive_deferred_messages |
接收以前已延迟的消息。 从分区实体接收延迟的消息时,提供的所有序列号必须是来自同一分区的消息。 |
receive_messages |
一次接收一批消息。 如果要同时处理多条消息,或者以单个调用的形式执行即席接收,则此方法是最佳选择。 请注意,在单个批处理中检索的消息数取决于是否为接收方设置了 prefetch_count 。 如果未为接收方设置 prefetch_count ,则接收方将尝试缓存max_message_count ((如果向服务提供) 请求中的消息)。 此调用优先于满足指定批大小的快速返回,因此在收到至少一条消息并且传入消息中存在间隙时,无论指定的批大小如何,都将立即返回。 |
renew_message_lock |
续订消息锁。 这将保持消息的锁定,以确保不会将其返回到队列中以重新处理。 若要完成 (或以其他方式解决消息) ,必须维护锁,并且不能已过期:无法续订过期的锁。 通过RECEIVE_AND_DELETE模式接收的消息未锁定,因此无法续订。 此操作也仅适用于非会话消息。 |
abandon_message
放弃消息。
此消息将返回到队列,并可供再次接收。
abandon_message(message: ServiceBusReceivedMessage) -> None
参数
返回类型
例外
示例
放弃收到的消息。
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
servicebus_receiver.abandon_message(message)
close
close() -> None
例外
complete_message
完成消息。
这会从队列中删除消息。
complete_message(message: ServiceBusReceivedMessage) -> None
参数
返回类型
例外
示例
完成收到的消息。
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
servicebus_receiver.complete_message(message)
dead_letter_message
将消息移动到死信队列。
死信队列是一个子队列,可用于存储无法正确处理的消息,或者需要进一步检查或处理的消息。 还可以将队列配置为将过期的消息发送到死信队列。
dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None
参数
返回类型
例外
示例
收到的消息的死信。
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
servicebus_receiver.dead_letter_message(message)
defer_message
延迟消息。
此消息将保留在队列中,但必须按其序列号专门请求才能接收。
defer_message(message: ServiceBusReceivedMessage) -> None
参数
返回类型
例外
示例
延迟收到的消息。
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
servicebus_receiver.defer_message(message)
next
next()
例外
peek_messages
浏览队列中当前挂起的消息。
不会从队列中删除速览消息,也不会将其锁定。 它们不能完成、延迟或死信。
peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
参数
- sequence_number
- int
从中开始浏览消息的消息序列号。
返回
~azure.servicebus.ServiceBusReceivedMessage 的列表。
返回类型
例外
示例
查看队列中的挂起消息。
with servicebus_receiver:
messages = servicebus_receiver.peek_messages()
for message in messages:
print(str(message))
receive_deferred_messages
接收以前已延迟的消息。
从分区实体接收延迟的消息时,提供的所有序列号必须是来自同一分区的消息。
receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
参数
返回
请求的 ~azure.servicebus.ServiceBusReceivedMessage 实例的列表。
返回类型
例外
示例
从 ServiceBus 接收延迟的消息。
with servicebus_receiver:
deferred_sequenced_numbers = []
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
deferred_sequenced_numbers.append(message.sequence_number)
print(str(message))
servicebus_receiver.defer_message(message)
received_deferred_msg = servicebus_receiver.receive_deferred_messages(
sequence_numbers=deferred_sequenced_numbers
)
for msg in received_deferred_msg:
servicebus_receiver.complete_message(msg)
receive_messages
一次接收一批消息。
如果要同时处理多条消息,或者以单个调用的形式执行即席接收,则此方法是最佳选择。
请注意,在单个批处理中检索的消息数取决于是否为接收方设置了 prefetch_count 。 如果未为接收方设置 prefetch_count ,则接收方将尝试缓存max_message_count ((如果向服务提供) 请求中的消息)。
此调用优先于满足指定批大小的快速返回,因此在收到至少一条消息并且传入消息中存在间隙时,无论指定的批大小如何,都将立即返回。
receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]
参数
批中的最大消息数。 返回的实际数量取决于prefetch_count和传入流速率。 设置为“无”将完全取决于预提取配置。默认值为 1。
等待第一条消息到达的最长时间(以秒为单位)。 如果没有消息到达,并且未指定超时,则此调用在连接关闭之前不会返回。 如果指定,则超时期限内没有消息到达,将返回空列表。
返回
收到的消息列表。 如果没有可用的消息,则此列表为空列表。
返回类型
例外
示例
从 ServiceBus 接收消息。
with servicebus_receiver:
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
print(str(message))
servicebus_receiver.complete_message(message)
renew_message_lock
续订消息锁。
这将保持消息的锁定,以确保不会将其返回到队列中以重新处理。
若要完成 (或以其他方式解决消息) ,必须维护锁,并且不能已过期:无法续订过期的锁。
通过RECEIVE_AND_DELETE模式接收的消息未锁定,因此无法续订。 此操作也仅适用于非会话消息。
renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime
参数
返回
锁定设置为过期的 utc 日期时间。
返回类型
例外
示例
对收到的消息续订锁。
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
servicebus_receiver.renew_message_lock(message)
属性
client_identifier
session
获取与接收方链接的 ServiceBusSession 对象。 会话仅适用于已启用会话的实体,如果在非会话接收方上调用,它将返回 None。
返回类型
示例
从接收方获取会话
with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
session = receiver.session