你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
EventHubConsumerClient 类
EventHubConsumerClient 类定义一个高级接口,用于从Azure 事件中心服务接收事件。
EventHubConsumerClient main目标是通过负载均衡和检查点从 EventHub 的所有分区接收事件。
当多个 EventHubConsumerClient 实例针对同一个事件中心、使用者组和检查点位置运行时,分区将在其中均匀分布。
若要启用负载均衡和持久化检查点,必须在创建 EventHubConsumerClient 时设置checkpoint_store。 如果未提供检查点存储,该检查点将在内存中内部维护。
当调用 EventHubConsumerClient 方法 receive () 或 receive_batch () 并指定partition_id时,EventHubConsumerClient 也可以从特定分区接收。 负载均衡在单分区模式下不起作用。 但如果设置了checkpoint_store,用户仍然可以保存检查点。
- 继承
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
构造函数
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
参数
- credential
- AsyncTokenCredential 或 AzureSasCredential 或 AzureNamedKeyCredential
用于身份验证的凭据对象,该对象实现用于获取令牌的特定接口。 它接受 EventHubSharedKeyCredential由 azure 标识库生成的 、 或凭据对象,以及实现 *get_token (self、 scopes) 方法的对象。
- logging_enable
- bool
是否将网络跟踪日志输出到记录器。 默认值为 False。
- auth_timeout
- float
等待令牌由服务授权的时间(以秒为单位)。 默认值为 60 秒。 如果设置为 0,则不会从客户端强制实施超时。
- user_agent
- str
如果指定,则会将其添加到用户代理字符串的前面。
- retry_total
- int
发生错误时恢复失败操作的尝试总数。 默认值为 3。 接收中 retry_total 的上下文很特殊: 接收 方法是通过每次迭代中调用内部接收方法的 while 循环实现的。 在 接收 情况下, retry_total 指定 while 循环中内部接收方法引发的错误后重试次数。 如果重试尝试已用尽,则将调用 on_error 回调 ((如果) 提供错误信息)。 如果提供 ) ,则将关闭失败的内部分区使用者, (将调用on_partition_close并创建新的内部分区使用者, (如果提供 on_partition_initialize) 来恢复接收,则将调用on_partition_initialize。
- retry_backoff_factor
- float
第二次尝试后在尝试之间应用的退让因素 (大多数错误通过第二次尝试立即解决,无需延迟) 。 在固定模式下,重试策略将始终为 {backoff factor} 休眠。 在“指数”模式下,重试策略将休眠: {backoff factor} * (2 ** ({总重试次数} - 1) ) 秒。 如果backoff_factor为 0.1,则重试将在两次重试之间休眠 [0.0s, 0.2s, 0.4s, ...]。 默认值为 0.8。
- retry_backoff_max
- float
最长回退时间。 默认值为 120 秒 (2 分钟) 。
- retry_mode
- str
重试尝试之间的延迟行为。 支持的值是“fixed”或“exponential”,其中默认值为“exponential”。
- idle_timeout
- float
超时(以秒为单位),之后,如果没有其他活动,此客户端将关闭基础连接。 默认情况下,该值为 None,这意味着除非由服务启动,否则客户端不会因为处于非活动状态而关闭。
- transport_type
- TransportType
将用于与事件中心服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket 。
- http_proxy
HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。
- checkpoint_store
- Optional[CheckpointStore]
一个管理器,用于在接收事件时存储分区负载均衡和检查点数据。 检查点存储将用于从所有分区或单个分区接收的两种情况。 在后一种情况下,负载均衡不适用。 如果未提供检查点存储,检查点将在内存中内部维护, EventHubConsumerClient 实例将接收不进行负载均衡的事件。
- load_balancing_interval
- float
当负载均衡启动时。 这是两次负载均衡评估之间的间隔(以秒为单位)。 默认值为 30 秒。
- partition_ownership_expiration_interval
- float
分区所有权将在该秒数后过期。 每次负载均衡评估都会自动延长所有权过期时间。 默认值为 6 * load_balancing_interval,即使用 30 秒的默认load_balancing_interval时为 180 秒。
- load_balancing_strategy
- str 或 LoadBalancingStrategy
启动负载均衡时,它将使用此策略来声明和平衡分区所有权。 将“greedy”或 LoadBalancingStrategy.GREEDY 用于贪婪策略,对于每次负载均衡评估,该策略将获取均衡负载所需的任意数量的未认领分区。 对于均衡策略,请使用“balanced”或 LoadBalancingStrategy.BALANCED ,该策略每次进行负载均衡评估时,只声明其他 EventHubConsumerClient 未声明的一个分区。 如果 EventHub 的所有分区都由其他 EventHubConsumerClient 声明,并且此客户端声明的分区太少,则无论负载均衡策略如何,此客户端都将从其他客户端中窃取一个分区进行每次负载均衡评估。 默认情况下使用贪婪策略。
用于与事件中心服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在 custom_endpoint_address中指定端口,则默认使用端口 443。
SSL 证书的自定义CA_BUNDLE文件的路径,用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where () 。
- uamqp_transport
- bool
是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。
- socket_timeout
- float
连接上的基础套接字在发送和接收数据时应在超时之前等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果 EventHubsConnectionError 错误由于写入超时而发生,则可能需要传入大于默认值的值。 这适用于高级使用方案,通常默认值应足够。
示例
创建 EventHubConsumerClient 的新实例。
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
方法
close |
停止从事件中心检索事件,并关闭基础 AMQP 连接和链接。 |
from_connection_string |
从连接字符串创建 EventHubConsumerClient。 |
get_eventhub_properties |
获取事件中心的属性。 返回的字典中的键包括:
|
get_partition_ids |
获取事件中心的分区 ID。 |
get_partition_properties |
获取指定分区的属性。 属性字典中的键包括:
|
receive |
使用可选的负载均衡和检查点从分区 () 接收事件。 |
receive_batch |
使用可选的负载均衡和检查点从分区 () 分批接收事件。 |
close
停止从事件中心检索事件,并关闭基础 AMQP 连接和链接。
async close() -> None
返回类型
示例
关闭客户端。
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
从连接字符串创建 EventHubConsumerClient。
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
参数
- eventhub_name
- str
要将客户端连接到的特定事件中心的路径。
- logging_enable
- bool
是否将网络跟踪日志输出到记录器。 默认值为 False。
- http_proxy
- dict
HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。
- auth_timeout
- float
等待令牌由服务授权的时间(以秒为单位)。 默认值为 60 秒。 如果设置为 0,则不会从客户端强制实施超时。
- user_agent
- str
如果指定,则会将其添加到用户代理字符串的前面。
- retry_total
- int
发生错误时恢复失败操作的尝试总数。 默认值为 3。 接收中 retry_total 的上下文很特殊: 接收 方法是通过每次迭代中调用内部接收方法的 while 循环实现的。 在 接收 情况下, retry_total 指定 while 循环中内部接收方法引发的错误后重试次数。 如果重试尝试已用尽,则将调用 on_error 回调 ((如果) 提供错误信息)。 如果提供 ) ,则将关闭失败的内部分区使用者, (将调用on_partition_close并创建新的内部分区使用者, (如果提供 on_partition_initialize) 来恢复接收,则将调用on_partition_initialize。
- retry_backoff_factor
- float
第二次尝试后在尝试之间应用的退让因素 (大多数错误通过第二次尝试立即解决,无需延迟) 。 在固定模式下,重试策略将始终为 {backoff factor} 休眠。 在“指数”模式下,重试策略将休眠: {backoff factor} * (2 ** ({总重试次数} - 1) ) 秒。 如果backoff_factor为 0.1,则重试将在两次重试之间休眠 [0.0s, 0.2s, 0.4s, ...]。 默认值为 0.8。
- retry_backoff_max
- float
最长回退时间。 默认值为 120 秒 (2 分钟) 。
- retry_mode
- str
重试尝试之间的延迟行为。 支持的值是“fixed”或“exponential”,其中默认值为“exponential”。
- idle_timeout
- float
超时(以秒为单位),之后,如果没有其他活动,此客户端将关闭基础连接。 默认情况下,该值为 None,这意味着除非由服务启动,否则客户端不会因为处于非活动状态而关闭。
- transport_type
- TransportType
将用于与事件中心服务通信的传输协议的类型。 默认值为 TransportType.Amqp ,在这种情况下,使用端口 5671。 如果端口 5671 在网络环境中不可用/被阻止,则可改用使用端口 443 进行通信的 TransportType.AmqpOverWebsocket 。
- checkpoint_store
- Optional[CheckpointStore]
一个管理器,用于在接收事件时存储分区负载均衡和检查点数据。 检查点存储将用于从所有分区或单个分区接收的两种情况。 在后一种情况下,负载均衡不适用。 如果未提供检查点存储,检查点将在内存中内部维护, EventHubConsumerClient 实例将接收不进行负载均衡的事件。
- load_balancing_interval
- float
当负载均衡启动时。 这是两次负载均衡评估之间的间隔(以秒为单位)。 默认值为 30 秒。
- partition_ownership_expiration_interval
- float
分区所有权将在该秒数后过期。 每次负载均衡评估都会自动延长所有权过期时间。 默认值为 6 * load_balancing_interval,即使用 30 秒的默认load_balancing_interval时为 180 秒。
- load_balancing_strategy
- str 或 LoadBalancingStrategy
启动负载均衡时,它将使用此策略来声明和平衡分区所有权。 将“greedy”或 LoadBalancingStrategy.GREEDY 用于贪婪策略,对于每次负载均衡评估,该策略将获取均衡负载所需的任意数量的未认领分区。 对于均衡策略,请使用“balanced”或 LoadBalancingStrategy.BALANCED ,该策略每次进行负载均衡评估时,只声明其他 EventHubConsumerClient 未声明的一个分区。 如果 EventHub 的所有分区都由其他 EventHubConsumerClient 声明,并且此客户端声明的分区太少,则无论负载均衡策略如何,此客户端都将从其他客户端中窃取一个分区进行每次负载均衡评估。 默认情况下使用贪婪策略。
用于与事件中心服务建立连接的自定义终结点地址,允许通过任何应用程序网关或主机环境所需的其他路径路由网络请求。 默认值为 None。 格式类似于“sb://< custom_endpoint_hostname>:<custom_endpoint_port>”。 如果未在 custom_endpoint_address中指定端口,则默认使用端口 443。
SSL 证书的自定义CA_BUNDLE文件的路径,用于对连接终结点的标识进行身份验证。 默认值为 None,在这种情况下将使用 certifi.where () 。
- uamqp_transport
- bool
是否使用 uamqp 库作为基础传输。 默认值为 False,纯 Python AMQP 库将用作基础传输。
返回类型
示例
从 连接字符串 创建 EventHubConsumerClient 的新实例。
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
获取事件中心的属性。
返回的字典中的键包括:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
返回
包含有关事件中心信息的字典。
返回类型
例外
get_partition_ids
get_partition_properties
获取指定分区的属性。
属性字典中的键包括:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
参数
返回
包含分区属性的字典。
返回类型
例外
receive
使用可选的负载均衡和检查点从分区 () 接收事件。
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
参数
- on_event
- Callable[PartitionContext, Optional[EventData]]
用于处理收到的事件的回调函数。 回调采用两个参数:包含分区上下文的partition_context和接收的事件。 回调函数的定义应如下所示: on_event (partition_context、事件) 。 有关详细的分区上下文信息,请参阅 PartitionContext。
- max_wait_time
- float
事件处理程序在调用回调之前等待的最大间隔(秒)。 如果在此间隔内未收到任何事件,则将使用 None 调用on_event回调。 如果此值设置为 None 或 0 (默认) ,则在收到事件之前不会调用回调。
- partition_id
- str
如果指定,客户端将仅从此分区接收。 否则,客户端将从所有分区接收。
- owner_level
- int
独占使用者的优先级。 如果设置了owner_level,则会创建独占使用者。 具有较高owner_level的使用者具有较高的独占优先级。 所有者级别也称为使用者的“纪元值”。
- prefetch
- int
要从服务中预提取以进行处理的事件数。 默认值为 300。
- track_last_enqueued_event_properties
- bool
指示使用者是否应请求有关其关联分区上最后排队的事件的信息,并在收到事件时跟踪该信息。 跟踪有关最后排队事件的分区信息时,从事件中心服务接收的每个事件都将携带有关分区的元数据。 这会导致少量额外的网络带宽消耗,在考虑与使用事件中心客户端定期发出分区属性请求时,这通常是一种有利的权衡。 默认情况下,它设置为 False 。
如果没有分区的检查点数据,请从此事件位置开始接收。 如果可用,将使用检查点数据。 这可以是分区 ID 作为键、位置作为单个分区的值的听写,也可以是所有分区的单个值。 值类型可以是 str、int 或 datetime.datetime。 还支持从流开头接收的值“-1”和“@latest”,用于仅接收新事件。
确定给定starting_position是否为非独占 (>=) (>) 。 对于非独占,为 True,对于独占,则为 False。 这可以是分区 ID 作为键,bool 作为值的听写,该值指示特定分区的starting_position是否为非独占分区。 对于所有starting_position,也可以是单个布尔值。 默认值为 False。
- on_error
- Callable[[PartitionContext, Exception]]
在重试尝试用尽后接收期间或负载均衡过程中引发错误时调用的回调函数。 回调采用两个参数: 包含 分区信息的partition_context和 异常错误 。 如果在负载均衡过程中引发错误,partition_context可能为 None。 回调的定义应如下所示: on_error (partition_context、错误) 。 如果在 on_event 回调期间引发未经处理的异常,也会调用 on_error 回调。
- on_partition_initialize
- Callable[[PartitionContext]]
在某个分区的使用者完成初始化后将调用的回调函数。 创建新的内部分区使用者以接管失败且已关闭的内部分区使用者的接收进程时,也会调用它。 回调采用单个参数: partition_context ,其中包含分区信息。 回调的定义应如下所示: on_partition_initialize (partition_context) 。
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
关闭某个分区的使用者后将调用的回调函数。 在重试尝试用尽后接收期间引发错误时,也会调用它。 回调采用两个参数: partition_context ,其中包含分区信息和关闭 原因 。 回调的定义应如下所示: on_partition_close (partition_context、原因) 。 有关各种结束原因, CloseReason 请参阅 。
返回类型
示例
从 EventHub 接收事件。
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
使用可选的负载均衡和检查点从分区 () 分批接收事件。
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
参数
- on_event_batch
- Callable[PartitionContext, List[EventData]]
用于处理一批已接收事件的回调函数。 回调采用两个参数:包含分区上下文 的partition_context 和 event_batch(即接收的事件)。 回调函数的定义应如下所示: on_event_batch (partition_context、event_batch) 。 如果max_wait_time不为 None 或 0,并且max_wait_time后未收到任何事件,则event_batch可能是空列表。 有关详细的分区上下文信息,请参阅 PartitionContext。
- max_batch_size
- int
批处理中传递到回调 on_event_batch的最大事件数。 如果实际接收的事件数大于 max_batch_size,则接收的事件将分成多个批,并为每个批调用回调,最多 max_batch_size 个事件。
- max_wait_time
- float
事件处理程序在调用回调之前等待的最大间隔(以秒为单位)。 如果在此间隔内未收到任何事件,则将使用空列表调用 on_event_batch 回调。 如果此值设置为 None 或 0 (默认) ,则在收到事件之前不会调用回调。
- partition_id
- str
如果指定,客户端将仅接收来自此分区的 。 否则,客户端将从所有分区接收 。
- owner_level
- int
独占使用者的优先级。 如果设置了owner_level,将创建独占使用者。 具有较高owner_level的使用者具有较高的独占优先级。 所有者级别也称为使用者的“纪元值”。
- prefetch
- int
要从服务中预提取以进行处理的事件数。 默认值为 300。
- track_last_enqueued_event_properties
- bool
指示使用者是否应请求有关其关联分区上最后排队事件的信息,并在接收事件时跟踪该信息。 跟踪有关最后排队事件的分区信息时,从事件中心服务接收的每个事件都将携带有关分区的元数据。 这会导致少量额外的网络带宽消耗,在考虑使用事件中心客户端定期请求分区属性时,这通常是一个有利的权衡。 默认情况下,它设置为 False 。
如果没有分区的检查点数据,请从此事件位置开始接收 。 如果可用,将使用检查点数据。 这可以是一个 dict,其中分区 ID 作为键,位置作为单个分区的值,或者所有分区的单个值。 值类型可以是 str、int 或 datetime.datetime。 还支持值“-1”用于从流开头接收,“@latest”用于仅接收新事件。
确定给定starting_position是否为非独占 (>=) (>) 。 True 表示非独占,False 表示独占。 这可以是一个 dict,其中分区 ID 为键,bool 作为值,指示特定分区的starting_position是否包含。 这也可以是所有starting_position的单个布尔值。 默认值为 False。
- on_error
- Callable[[PartitionContext, Exception]]
在重试尝试用尽后或在负载均衡过程中接收期间引发错误时调用的回调函数。 回调采用两个参数: partition_context ,其中包含分区信息, 错误 是异常。 如果在负载均衡过程中引发错误,partition_context可能为 None。 回调的定义应如下所示: on_error (partition_context、错误) 。 如果在 on_event 回调期间引发未经处理的异常,也会调用 on_error 回调。
- on_partition_initialize
- Callable[[PartitionContext]]
在某个分区的使用者完成初始化后调用的回调函数。 当创建新的内部分区使用者以接管失败和已关闭的内部分区使用者的接收进程时,也会调用它。 回调采用单个参数: partition_context ,其中包含分区信息。 回调的定义应如下所示: on_partition_initialize (partition_context) 。
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
关闭特定分区的使用者后将调用的回调函数。 在重试尝试用尽后,在接收过程中引发错误时,也会调用它。 回调采用两个参数: partition_context ,其中包含分区信息和关闭 原因 。 回调的定义应如下所示: on_partition_close (partition_context、reason) 。 有关各种关闭原因,请参阅 CloseReason 。
返回类型
示例
从 EventHub 分批接收事件。
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)