EventHubConsumerClient Clase
La clase EventHubConsumerClient define una interfaz de alto nivel para recibir eventos del servicio Azure Event Hubs.
El objetivo principal de EventHubConsumerClient es recibir eventos de todas las particiones de un EventHub con equilibrio de carga y puntos de comprobación.
Cuando se ejecutan varias instancias de EventHubConsumerClient en el mismo centro de eventos, el grupo de consumidores y la ubicación de puntos de control, las particiones se distribuirán uniformemente entre ellos.
Para habilitar el equilibrio de carga y los puntos de control persistentes, checkpoint_store debe establecerse al crear EventHubConsumerClient. Si no se proporciona un almacén de puntos de control, el punto de control se mantendrá internamente en la memoria.
Un EventHubConsumerClient también puede recibir de una partición específica al llamar a su método receive() o receive_batch() y especificar el partition_id. El equilibrio de carga no funcionará en modo de partición única. Pero los usuarios todavía pueden guardar puntos de control si se establece el checkpoint_store.
- Herencia
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
Constructor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parámetros
- fully_qualified_namespace
- str
Nombre de host completo para el espacio de nombres de Event Hubs. El formato del espacio de nombres es: .servicebus.windows.net.
- eventhub_name
- str
Ruta de acceso del centro de eventos específico al que se va a conectar el cliente.
- credential
- AsyncTokenCredential o AzureSasCredential o AzureNamedKeyCredential
Objeto de credencial usado para la autenticación que implementa una interfaz determinada para obtener tokens. Acepta objetos de credenciales EventHubSharedKeyCredentialo generados por la biblioteca azure-identity y los objetos que implementan el método *get_token(self, scopes).
- logging_enable
- bool
Indica si se van a generar registros de seguimiento de red en el registrador. El valor predeterminado es false.
- auth_timeout
- float
Tiempo en segundos para esperar a que el servicio autorice un token. El valor predeterminado es de 60 segundos. Si se establece en 0, no se aplicará ningún tiempo de espera desde el cliente.
- user_agent
- str
Si se especifica, se agregará delante de la cadena del agente de usuario.
- retry_total
- int
Número total de intentos de rehacer una operación con error cuando se produce un error. El valor predeterminado es 3. El contexto de retry_total en la recepción es especial: el método receive se implementa mediante un método de recepción interno que llama al bucle while en cada iteración. En el caso de recepción , retry_total especifica los números de reintento después del error generado por el método de recepción interno en el bucle while. Si se agotan los reintentos, se llamará a la devolución de llamada on_error (si se proporciona) con la información de error. Se cerrará el consumidor de partición interna con error (se llamará a on_partition_close si se proporciona) y se creará un nuevo consumidor de partición interna (se llamará on_partition_initialize si se proporciona) para reanudar la recepción.
- retry_backoff_factor
- float
Un factor de retroceso que se va a aplicar entre los intentos después del segundo intento (la mayoría de los errores se resuelven inmediatamente mediante un segundo intento sin retraso). En modo fijo, la directiva de reintento siempre se suspenderá para {factor de retroceso}. En el modo "exponencial", la directiva de reintento se suspenderá durante: {factor de retroceso} * (2 ** ({número de reintentos totales} - 1)) segundos. Si el backoff_factor es 0,1, el reintento se suspenderá para [0.0s, 0.2s, 0.4s, ...] entre reintentos. El valor predeterminado es 0,8.
- retry_backoff_max
- float
Tiempo de retroceso máximo. El valor predeterminado es 120 segundos (2 minutos).
- retry_mode
- str
Comportamiento de retraso entre reintentos. Los valores admitidos son "fijo" o "exponencial", donde el valor predeterminado es "exponencial".
- idle_timeout
- float
Tiempo de espera, en segundos, después del cual este cliente cerrará la conexión subyacente si no hay ninguna actividad adicional. De forma predeterminada, el valor es None, lo que significa que el cliente no se apagará debido a la inactividad a menos que el servicio lo inicie.
- transport_type
- TransportType
Tipo de protocolo de transporte que se usará para comunicarse con el servicio Event Hubs. El valor predeterminado es TransportType.Amqp en cuyo caso se usa el puerto 5671. Si el puerto 5671 no está disponible o bloqueado en el entorno de red, TransportType.AmqpOverWebsocket podría usarse en su lugar, que usa el puerto 443 para la comunicación.
- http_proxy
Configuración del proxy HTTP. Debe ser un diccionario con las siguientes claves: "proxy_hostname" (valor str) y "proxy_port" (valor int).
- checkpoint_store
- Optional[CheckpointStore]
Administrador que almacena los datos de equilibrio de carga y punto de comprobación de la partición al recibir eventos. El almacén de puntos de control se usará en ambos casos de recepción de todas las particiones o de una sola partición. En este último caso, no se aplica el equilibrio de carga. Si no se proporciona un almacén de puntos de control, el punto de control se mantendrá internamente en la memoria y la instancia de EventHubConsumerClient recibirá eventos sin equilibrio de carga.
- load_balancing_interval
- float
Cuando se inicia el equilibrio de carga. Este es el intervalo, en segundos, entre dos evaluaciones de equilibrio de carga. El valor predeterminado es 30 segundos.
- partition_ownership_expiration_interval
- float
Una propiedad de partición expirará después de este número de segundos. Cada evaluación de equilibrio de carga extenderá automáticamente el tiempo de expiración de la propiedad. El valor predeterminado es 6 * load_balancing_interval, es decir, 180 segundos al usar el load_balancing_interval predeterminado de 30 segundos.
- load_balancing_strategy
- str o LoadBalancingStrategy
Cuando se inicie el equilibrio de carga, usará esta estrategia para reclamar y equilibrar la propiedad de la partición. Use "expansible" o LoadBalancingStrategy.GREEDY para la estrategia expansible, que, para cada evaluación de equilibrio de carga, capturará tantas particiones no reclamadas necesarias para equilibrar la carga. Use "equilibrado" o LoadBalancingStrategy.BALANCED para la estrategia equilibrada, que, para cada evaluación de equilibrio de carga, solo reclama una partición que no sea reclamada por otros EventHubConsumerClient. Si otras particiones de Un EventHub son reclamadas por otros EventHubConsumerClient y este cliente ha reclamado demasiadas particiones, este cliente robará una partición de otros clientes para cada evaluación de equilibrio de carga, independientemente de la estrategia de equilibrio de carga. La estrategia expansa se usa de forma predeterminada.
La dirección del punto de conexión personalizado que se va a usar para establecer una conexión con el servicio Event Hubs, lo que permite enrutar las solicitudes de red a través de las puertas de enlace de aplicaciones u otras rutas de acceso necesarias para el entorno host. El valor predeterminado es None. El formato sería "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Si no se especifica el puerto en el custom_endpoint_address, se usará el puerto 443 de forma predeterminada.
Ruta de acceso al archivo de CA_BUNDLE personalizado del certificado SSL que se usa para autenticar la identidad del punto de conexión. El valor predeterminado es None en cuyo caso se usará certifi.where().
- uamqp_transport
- bool
Si se debe usar la biblioteca uamqp como transporte subyacente. El valor predeterminado es False y la biblioteca AMQP de Python pura se usará como transporte subyacente.
- socket_timeout
- float
Tiempo en segundos que el socket subyacente de la conexión debe esperar al enviar y recibir datos antes de que se agote el tiempo de espera. El valor predeterminado es 0.2 para TransportType.Amqp y 1 para TransportType.AmqpOverWebsocket. Si se producen errores de EventHubsConnectionError debido al tiempo de espera de escritura, es posible que sea necesario pasar un valor mayor que el predeterminado. Esto es para escenarios de uso avanzado y normalmente el valor predeterminado debe ser suficiente.
Ejemplos
Cree una nueva instancia de EventHubConsumerClient.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Métodos
close |
Detenga la recuperación de eventos del centro de eventos y cierre la conexión y los vínculos de AMQP subyacentes. |
from_connection_string |
Cree un EventHubConsumerClient a partir de un cadena de conexión. |
get_eventhub_properties |
Obtiene las propiedades del centro de eventos. Las claves del diccionario devuelto incluyen:
|
get_partition_ids |
Obtiene los identificadores de partición del centro de eventos. |
get_partition_properties |
Obtiene las propiedades de la partición especificada. Las claves del diccionario de propiedades incluyen:
|
receive |
Reciba eventos de particiones, con equilibrio de carga opcional y puntos de control. |
receive_batch |
Reciba eventos de particiones en lotes, con equilibrio de carga opcional y puntos de control. |
close
Detenga la recuperación de eventos del centro de eventos y cierre la conexión y los vínculos de AMQP subyacentes.
async close() -> None
Tipo de valor devuelto
Ejemplos
Cierre el cliente.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
Cree un EventHubConsumerClient a partir de un cadena de conexión.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
Parámetros
- eventhub_name
- str
Ruta de acceso del centro de eventos específico al que se va a conectar el cliente.
- logging_enable
- bool
Indica si se van a generar registros de seguimiento de red en el registrador. El valor predeterminado es false.
- http_proxy
- dict
Configuración del proxy HTTP. Debe ser un diccionario con las siguientes claves: "proxy_hostname" (valor str) y "proxy_port" (valor int). Además, las claves siguientes también pueden estar presentes: "username", "password".
- auth_timeout
- float
Tiempo en segundos para esperar a que el servicio autorice un token. El valor predeterminado es de 60 segundos. Si se establece en 0, no se aplicará ningún tiempo de espera desde el cliente.
- user_agent
- str
Si se especifica, se agregará delante de la cadena del agente de usuario.
- retry_total
- int
Número total de intentos de rehacer una operación con error cuando se produce un error. El valor predeterminado es 3. El contexto de retry_total en la recepción es especial: el método receive se implementa mediante un método de recepción interno que llama al bucle while en cada iteración. En el caso de recepción , retry_total especifica los números de reintento después del error generado por el método de recepción interno en el bucle while. Si se agotan los reintentos, se llamará a la devolución de llamada on_error (si se proporciona) con la información de error. Se cerrará el consumidor de partición interna con error (se llamará a on_partition_close si se proporciona) y se creará un nuevo consumidor de partición interna (se llamará on_partition_initialize si se proporciona) para reanudar la recepción.
- retry_backoff_factor
- float
Un factor de retroceso que se va a aplicar entre los intentos después del segundo intento (la mayoría de los errores se resuelven inmediatamente mediante un segundo intento sin retraso). En el modo fijo, la directiva de reintento siempre se suspenderá para {factor de retroceso}. En el modo "exponencial", la directiva de reintento se suspenderá para: {factor de retroceso} * (2 ** ({número de reintentos totales} - 1)) segundos. Si el backoff_factor es 0.1, el reintento se suspenderá para [0.0s, 0.2s, 0.4s, ...] entre reintentos. El valor predeterminado es 0,8.
- retry_backoff_max
- float
Tiempo de retroceso máximo. El valor predeterminado es 120 segundos (2 minutos).
- retry_mode
- str
Comportamiento de retraso entre los reintentos. Los valores admitidos son "fijo" o "exponencial", donde el valor predeterminado es "exponencial".
- idle_timeout
- float
Tiempo de espera, en segundos, después del cual este cliente cerrará la conexión subyacente si no hay ninguna actividad adicional. De forma predeterminada, el valor es None, lo que significa que el cliente no se apagará debido a inactividad a menos que el servicio lo inicie.
- transport_type
- TransportType
Tipo de protocolo de transporte que se usará para comunicarse con el servicio Event Hubs. El valor predeterminado es TransportType.Amqp en cuyo caso se usa el puerto 5671. Si el puerto 5671 no está disponible o bloqueado en el entorno de red, TransportType.AmqpOverWebsocket podría usarse en su lugar, que usa el puerto 443 para la comunicación.
- checkpoint_store
- Optional[CheckpointStore]
Administrador que almacena los datos de equilibrio de carga de partición y punto de control al recibir eventos. El almacén de puntos de comprobación se usará en ambos casos de recepción de todas las particiones o de una sola partición. En este último caso, no se aplica el equilibrio de carga. Si no se proporciona un almacén de puntos de control, el punto de control se mantendrá internamente en memoria y la instancia de EventHubConsumerClient recibirá eventos sin equilibrio de carga.
- load_balancing_interval
- float
Cuando se inicia el equilibrio de carga. Este es el intervalo, en segundos, entre dos evaluaciones de equilibrio de carga. El valor predeterminado es 30 segundos.
- partition_ownership_expiration_interval
- float
Una propiedad de partición expirará después de este número de segundos. Cada evaluación de equilibrio de carga extenderá automáticamente el tiempo de expiración de la propiedad. El valor predeterminado es 6 * load_balancing_interval, es decir, 180 segundos cuando se usa el load_balancing_interval predeterminado de 30 segundos.
- load_balancing_strategy
- str o LoadBalancingStrategy
Cuando se inicie el equilibrio de carga, usará esta estrategia para reclamar y equilibrar la propiedad de la partición. Use " expansiones" o LoadBalancingStrategy.GREEDY para la estrategia expansible, que, para cada evaluación de equilibrio de carga, obtendrá tantas particiones sin reclamar necesarias para equilibrar la carga. Use "equilibrado" o LoadBalancingStrategy.BALANCED para la estrategia equilibrada, que, para cada evaluación de equilibrio de carga, solo reclama una partición que no sea reclamada por otros EventHubConsumerClient. Si otras particiones de Un EventHub son reclamadas por otros EventHubConsumerClient y este cliente ha reclamado demasiadas particiones, este cliente robará una partición de otros clientes para cada evaluación de equilibrio de carga independientemente de la estrategia de equilibrio de carga. La estrategia expansa se usa de forma predeterminada.
La dirección del punto de conexión personalizado que se va a usar para establecer una conexión con el servicio Event Hubs, lo que permite enrutar las solicitudes de red a través de las puertas de enlace de aplicaciones u otras rutas de acceso necesarias para el entorno host. El valor predeterminado es None. El formato sería "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Si no se especifica el puerto en el custom_endpoint_address, se usará el puerto 443 de forma predeterminada.
Ruta de acceso al archivo CA_BUNDLE personalizado del certificado SSL que se usa para autenticar la identidad del punto de conexión de conexión. El valor predeterminado es None en cuyo caso se usará certifi.where().
- uamqp_transport
- bool
Si se va a usar la biblioteca uamqp como transporte subyacente. El valor predeterminado es False y la biblioteca AMQP de Python pura se usará como transporte subyacente.
Tipo de valor devuelto
Ejemplos
Cree una nueva instancia de EventHubConsumerClient a partir de cadena de conexión.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Obtiene las propiedades del centro de eventos.
Las claves del diccionario devuelto incluyen:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Devoluciones
Diccionario que contiene información sobre el centro de eventos.
Tipo de valor devuelto
Excepciones
get_partition_ids
Obtiene los identificadores de partición del centro de eventos.
async get_partition_ids() -> List[str]
Devoluciones
Lista de identificadores de partición.
Tipo de valor devuelto
Excepciones
get_partition_properties
Obtiene las propiedades de la partición especificada.
Las claves del diccionario de propiedades incluyen:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parámetros
Devoluciones
Diccionario que contiene propiedades de partición.
Tipo de valor devuelto
Excepciones
receive
Reciba eventos de particiones, con equilibrio de carga opcional y puntos de control.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parámetros
- on_event
- Callable[PartitionContext, Optional[EventData]]
Función de devolución de llamada para controlar un evento recibido. La devolución de llamada toma dos parámetros: partition_context que contiene el contexto de partición y el evento que es el evento recibido. La función de devolución de llamada debe definirse como: on_event(partition_context, evento). Para obtener información detallada sobre el contexto de partición, consulte PartitionContext.
- max_wait_time
- float
Intervalo máximo en segundos que el procesador de eventos esperará antes de llamar a la devolución de llamada. Si no se recibe ningún evento dentro de este intervalo, se llamará a la devolución de llamada on_event con Ninguno. Si este valor se establece en Ninguno o 0 (valor predeterminado), no se llamará a la devolución de llamada hasta que se reciba un evento.
- partition_id
- str
Si se especifica, el cliente solo recibirá de esta partición. De lo contrario, el cliente recibirá de todas las particiones.
- owner_level
- int
Prioridad para un consumidor exclusivo. Se creará un consumidor exclusivo si se establece owner_level. Un consumidor con una mayor owner_level tiene una prioridad exclusiva más alta. El nivel de propietario también se conoce como el "valor de época" del consumidor.
- prefetch
- int
Número de eventos que se van a capturar previamente desde el servicio para su procesamiento. El valor predeterminado es 300.
- track_last_enqueued_event_properties
- bool
Indica si el consumidor debe solicitar información sobre el último evento en cola en su partición asociada y realizar un seguimiento de esa información a medida que se reciben los eventos. Cuando se realiza un seguimiento de la información sobre el último evento en cola de las particiones, cada evento recibido del servicio Event Hubs llevará metadatos sobre la partición. Esto da como resultado una pequeña cantidad de consumo de ancho de banda de red adicional que generalmente es un equilibrio favorable cuando se tiene en cuenta con la realización periódica de solicitudes para propiedades de partición mediante el cliente del centro de eventos. Se establece en False de forma predeterminada.
Comience a recibir desde esta posición de evento si no hay datos de punto de comprobación para una partición. Los datos de punto de comprobación se usarán si están disponibles. Puede ser un dict con el identificador de partición como clave y posición como el valor de las particiones individuales, o un valor único para todas las particiones. El tipo de valor puede ser str, int o datetime.datetime. También se admiten los valores "-1" para recibir desde el principio de la secuencia y "@latest" para recibir solo eventos nuevos.
Determine si el starting_position especificado es inclusivo(>=) o no (>). True para inclusive y False para exclusivo. Puede ser un dict con el identificador de partición como clave y bool como valor que indica si el starting_position de una partición específica es inclusivo o no. También puede ser un valor bool único para todos los starting_position. El valor predeterminado es False.
- on_error
- Callable[[PartitionContext, Exception]]
Función de devolución de llamada a la que se llamará cuando se genere un error durante la recepción después de que se agoten los reintentos, o durante el proceso de equilibrio de carga. La devolución de llamada toma dos parámetros: partition_context que contiene información de partición y error como excepción. partition_context podría ser Ninguno si el error se genera durante el proceso de equilibrio de carga. La devolución de llamada debe definirse como: on_error(partition_context, error). También se llamará a la devolución de llamada on_error si se produce una excepción no controlada durante la devolución de llamada on_event .
- on_partition_initialize
- Callable[[PartitionContext]]
Función de devolución de llamada a la que se llamará después de que un consumidor para una determinada partición finalice la inicialización. También se llamaría cuando se crea un nuevo consumidor de partición interna para asumir el proceso de recepción de un consumidor de partición interno con errores y cerrado. La devolución de llamada toma un único parámetro: partition_context que contiene la información de partición. La devolución de llamada debe definirse como: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Función de devolución de llamada a la que se llamará después de que se cierre un consumidor para una determinada partición. También se llamaría cuando se produce un error durante la recepción después de que se agoten los intentos de reintento. La devolución de llamada toma dos parámetros: partition_context que contiene información de partición y motivo del cierre. La devolución de llamada debe definirse como: on_partition_close(partition_context, reason). CloseReason Consulte las diversas razones de cierre.
Tipo de valor devuelto
Ejemplos
Recibir eventos de EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
Reciba eventos de particiones en lotes, con equilibrio de carga opcional y puntos de control.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parámetros
- on_event_batch
- Callable[PartitionContext, List[EventData]]
Función de devolución de llamada para controlar un lote de eventos recibidos. La devolución de llamada toma dos parámetros: partition_context que contiene el contexto de partición y event_batch, que es los eventos recibidos. La función de devolución de llamada debe definirse como: on_event_batch(partition_context, event_batch). event_batch podría ser una lista vacía si max_wait_time no es None ni 0 y no se recibe ningún evento después de max_wait_time. Para obtener información detallada sobre el contexto de partición, consulte PartitionContext.
- max_batch_size
- int
Número máximo de eventos de un lote pasado a la devolución de llamada on_event_batch. Si el número real recibido de eventos es mayor que max_batch_size, los eventos recibidos se dividen en lotes y llaman a la devolución de llamada de cada lote con hasta max_batch_size eventos.
- max_wait_time
- float
Intervalo máximo en segundos que el procesador de eventos esperará antes de llamar a la devolución de llamada. Si no se recibe ningún evento dentro de este intervalo, se llamará a la devolución de llamada on_event_batch con una lista vacía. Si este valor se establece en Ninguno o 0 (valor predeterminado), no se llamará a la devolución de llamada hasta que se reciban eventos.
- partition_id
- str
Si se especifica, el cliente solo recibirá de esta partición. De lo contrario, el cliente recibirá de todas las particiones.
- owner_level
- int
Prioridad para un consumidor exclusivo. Se creará un consumidor exclusivo si se establece owner_level. Un consumidor con una mayor owner_level tiene una prioridad exclusiva más alta. El nivel de propietario también se conoce como el "valor de época" del consumidor.
- prefetch
- int
Número de eventos que se van a capturar previamente desde el servicio para su procesamiento. El valor predeterminado es 300.
- track_last_enqueued_event_properties
- bool
Indica si el consumidor debe solicitar información sobre el último evento en cola en su partición asociada y realizar un seguimiento de esa información a medida que se reciben los eventos. Cuando se realiza un seguimiento de la información sobre el último evento en cola de las particiones, cada evento recibido del servicio Event Hubs llevará metadatos sobre la partición. Esto da como resultado una pequeña cantidad de consumo de ancho de banda de red adicional que generalmente es un equilibrio favorable cuando se tiene en cuenta con la realización periódica de solicitudes para propiedades de partición mediante el cliente del centro de eventos. Se establece en False de forma predeterminada.
Comience a recibir desde esta posición de evento si no hay datos de punto de comprobación para una partición. Los datos de punto de comprobación se usarán si están disponibles. Puede ser un dict con el identificador de partición como clave y posición como el valor de las particiones individuales, o un valor único para todas las particiones. El tipo de valor puede ser str, int o datetime.datetime. También se admiten los valores "-1" para recibir desde el principio de la secuencia y "@latest" para recibir solo eventos nuevos.
Determine si el starting_position especificado es inclusivo(>=) o no (>). True para inclusive y False para exclusivo. Puede ser un dict con el identificador de partición como clave y bool como valor que indica si el starting_position de una partición específica es inclusivo o no. También puede ser un valor bool único para todos los starting_position. El valor predeterminado es False.
- on_error
- Callable[[PartitionContext, Exception]]
Función de devolución de llamada a la que se llamará cuando se genere un error durante la recepción después de que se agoten los reintentos, o durante el proceso de equilibrio de carga. La devolución de llamada toma dos parámetros: partition_context que contiene información de partición y error como excepción. partition_context podría ser Ninguno si el error se genera durante el proceso de equilibrio de carga. La devolución de llamada debe definirse como: on_error(partition_context, error). También se llamará a la devolución de llamada on_error si se produce una excepción no controlada durante la devolución de llamada on_event .
- on_partition_initialize
- Callable[[PartitionContext]]
Función de devolución de llamada a la que se llamará después de que un consumidor para una determinada partición finalice la inicialización. También se llamaría cuando se crea un nuevo consumidor de partición interna para asumir el proceso de recepción de un consumidor de partición interno con errores y cerrado. La devolución de llamada toma un único parámetro: partition_context que contiene la información de partición. La devolución de llamada debe definirse como: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Función de devolución de llamada a la que se llamará después de que se cierre un consumidor para una determinada partición. También se llamaría cuando se produce un error durante la recepción después de que se agoten los intentos de reintento. La devolución de llamada toma dos parámetros: partition_context que contiene información de partición y motivo del cierre. La devolución de llamada debe definirse como: on_partition_close(partition_context, reason). CloseReason Consulte las diversas razones de cierre.
Tipo de valor devuelto
Ejemplos
Recibir eventos en lotes de EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python