Compartir a través de


EventHubConsumerClient Clase

La clase EventHubConsumerClient define una interfaz de alto nivel para recibir eventos del servicio Azure Event Hubs.

El objetivo principal de EventHubConsumerClient es recibir eventos de todas las particiones de un EventHub con equilibrio de carga y puntos de comprobación.

Cuando se ejecutan varias instancias de EventHubConsumerClient en el mismo centro de eventos, el grupo de consumidores y la ubicación de puntos de control, las particiones se distribuirán uniformemente entre ellos.

Para habilitar el equilibrio de carga y los puntos de control persistentes, checkpoint_store debe establecerse al crear EventHubConsumerClient. Si no se proporciona un almacén de puntos de control, el punto de control se mantendrá internamente en la memoria.

Un EventHubConsumerClient también puede recibir de una partición específica al llamar a su método receive() o receive_batch() y especificar el partition_id. El equilibrio de carga no funcionará en modo de partición única. Pero los usuarios todavía pueden guardar puntos de control si se establece el checkpoint_store.

Herencia
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Constructor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parámetros

fully_qualified_namespace
str
Requerido

Nombre de host completo para el espacio de nombres de Event Hubs. El formato del espacio de nombres es: .servicebus.windows.net.

eventhub_name
str
Requerido

Ruta de acceso del centro de eventos específico al que se va a conectar el cliente.

consumer_group
str
Requerido

Recibir eventos del centro de eventos para este grupo de consumidores.

credential
AsyncTokenCredential o AzureSasCredential o AzureNamedKeyCredential
Requerido

Objeto de credencial usado para la autenticación que implementa una interfaz determinada para obtener tokens. Acepta objetos de credenciales EventHubSharedKeyCredentialo generados por la biblioteca azure-identity y los objetos que implementan el método *get_token(self, scopes).

logging_enable
bool

Indica si se van a generar registros de seguimiento de red en el registrador. El valor predeterminado es false.

auth_timeout
float

Tiempo en segundos para esperar a que el servicio autorice un token. El valor predeterminado es de 60 segundos. Si se establece en 0, no se aplicará ningún tiempo de espera desde el cliente.

user_agent
str

Si se especifica, se agregará delante de la cadena del agente de usuario.

retry_total
int

Número total de intentos de rehacer una operación con error cuando se produce un error. El valor predeterminado es 3. El contexto de retry_total en la recepción es especial: el método receive se implementa mediante un método de recepción interno que llama al bucle while en cada iteración. En el caso de recepción , retry_total especifica los números de reintento después del error generado por el método de recepción interno en el bucle while. Si se agotan los reintentos, se llamará a la devolución de llamada on_error (si se proporciona) con la información de error. Se cerrará el consumidor de partición interna con error (se llamará a on_partition_close si se proporciona) y se creará un nuevo consumidor de partición interna (se llamará on_partition_initialize si se proporciona) para reanudar la recepción.

retry_backoff_factor
float

Un factor de retroceso que se va a aplicar entre los intentos después del segundo intento (la mayoría de los errores se resuelven inmediatamente mediante un segundo intento sin retraso). En modo fijo, la directiva de reintento siempre se suspenderá para {factor de retroceso}. En el modo "exponencial", la directiva de reintento se suspenderá durante: {factor de retroceso} * (2 ** ({número de reintentos totales} - 1)) segundos. Si el backoff_factor es 0,1, el reintento se suspenderá para [0.0s, 0.2s, 0.4s, ...] entre reintentos. El valor predeterminado es 0,8.

retry_backoff_max
float

Tiempo de retroceso máximo. El valor predeterminado es 120 segundos (2 minutos).

retry_mode
str

Comportamiento de retraso entre reintentos. Los valores admitidos son "fijo" o "exponencial", donde el valor predeterminado es "exponencial".

idle_timeout
float

Tiempo de espera, en segundos, después del cual este cliente cerrará la conexión subyacente si no hay ninguna actividad adicional. De forma predeterminada, el valor es None, lo que significa que el cliente no se apagará debido a la inactividad a menos que el servicio lo inicie.

transport_type
TransportType

Tipo de protocolo de transporte que se usará para comunicarse con el servicio Event Hubs. El valor predeterminado es TransportType.Amqp en cuyo caso se usa el puerto 5671. Si el puerto 5671 no está disponible o bloqueado en el entorno de red, TransportType.AmqpOverWebsocket podría usarse en su lugar, que usa el puerto 443 para la comunicación.

http_proxy

Configuración del proxy HTTP. Debe ser un diccionario con las siguientes claves: "proxy_hostname" (valor str) y "proxy_port" (valor int).

checkpoint_store
Optional[CheckpointStore]

Administrador que almacena los datos de equilibrio de carga y punto de comprobación de la partición al recibir eventos. El almacén de puntos de control se usará en ambos casos de recepción de todas las particiones o de una sola partición. En este último caso, no se aplica el equilibrio de carga. Si no se proporciona un almacén de puntos de control, el punto de control se mantendrá internamente en la memoria y la instancia de EventHubConsumerClient recibirá eventos sin equilibrio de carga.

load_balancing_interval
float

Cuando se inicia el equilibrio de carga. Este es el intervalo, en segundos, entre dos evaluaciones de equilibrio de carga. El valor predeterminado es 30 segundos.

partition_ownership_expiration_interval
float

Una propiedad de partición expirará después de este número de segundos. Cada evaluación de equilibrio de carga extenderá automáticamente el tiempo de expiración de la propiedad. El valor predeterminado es 6 * load_balancing_interval, es decir, 180 segundos al usar el load_balancing_interval predeterminado de 30 segundos.

load_balancing_strategy
str o LoadBalancingStrategy

Cuando se inicie el equilibrio de carga, usará esta estrategia para reclamar y equilibrar la propiedad de la partición. Use "expansible" o LoadBalancingStrategy.GREEDY para la estrategia expansible, que, para cada evaluación de equilibrio de carga, capturará tantas particiones no reclamadas necesarias para equilibrar la carga. Use "equilibrado" o LoadBalancingStrategy.BALANCED para la estrategia equilibrada, que, para cada evaluación de equilibrio de carga, solo reclama una partición que no sea reclamada por otros EventHubConsumerClient. Si otras particiones de Un EventHub son reclamadas por otros EventHubConsumerClient y este cliente ha reclamado demasiadas particiones, este cliente robará una partición de otros clientes para cada evaluación de equilibrio de carga, independientemente de la estrategia de equilibrio de carga. La estrategia expansa se usa de forma predeterminada.

custom_endpoint_address
Optional[str]

La dirección del punto de conexión personalizado que se va a usar para establecer una conexión con el servicio Event Hubs, lo que permite enrutar las solicitudes de red a través de las puertas de enlace de aplicaciones u otras rutas de acceso necesarias para el entorno host. El valor predeterminado es None. El formato sería "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Si no se especifica el puerto en el custom_endpoint_address, se usará el puerto 443 de forma predeterminada.

connection_verify
Optional[str]

Ruta de acceso al archivo de CA_BUNDLE personalizado del certificado SSL que se usa para autenticar la identidad del punto de conexión. El valor predeterminado es None en cuyo caso se usará certifi.where().

uamqp_transport
bool

Si se debe usar la biblioteca uamqp como transporte subyacente. El valor predeterminado es False y la biblioteca AMQP de Python pura se usará como transporte subyacente.

socket_timeout
float

Tiempo en segundos que el socket subyacente de la conexión debe esperar al enviar y recibir datos antes de que se agote el tiempo de espera. El valor predeterminado es 0.2 para TransportType.Amqp y 1 para TransportType.AmqpOverWebsocket. Si se producen errores de EventHubsConnectionError debido al tiempo de espera de escritura, es posible que sea necesario pasar un valor mayor que el predeterminado. Esto es para escenarios de uso avanzado y normalmente el valor predeterminado debe ser suficiente.

Ejemplos

Cree una nueva instancia de EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Métodos

close

Detenga la recuperación de eventos del centro de eventos y cierre la conexión y los vínculos de AMQP subyacentes.

from_connection_string

Cree un EventHubConsumerClient a partir de un cadena de conexión.

get_eventhub_properties

Obtiene las propiedades del centro de eventos.

Las claves del diccionario devuelto incluyen:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Obtiene los identificadores de partición del centro de eventos.

get_partition_properties

Obtiene las propiedades de la partición especificada.

Las claves del diccionario de propiedades incluyen:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Reciba eventos de particiones, con equilibrio de carga opcional y puntos de control.

receive_batch

Reciba eventos de particiones en lotes, con equilibrio de carga opcional y puntos de control.

close

Detenga la recuperación de eventos del centro de eventos y cierre la conexión y los vínculos de AMQP subyacentes.

async close() -> None

Tipo de valor devuelto

Ejemplos

Cierre el cliente.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

Cree un EventHubConsumerClient a partir de un cadena de conexión.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Parámetros

conn_str
str
Requerido

El cadena de conexión de un centro de eventos.

consumer_group
str
Requerido

Recibir eventos del Centro de eventos para este grupo de consumidores.

eventhub_name
str

Ruta de acceso del centro de eventos específico al que se va a conectar el cliente.

logging_enable
bool

Indica si se van a generar registros de seguimiento de red en el registrador. El valor predeterminado es false.

http_proxy
dict

Configuración del proxy HTTP. Debe ser un diccionario con las siguientes claves: "proxy_hostname" (valor str) y "proxy_port" (valor int). Además, las claves siguientes también pueden estar presentes: "username", "password".

auth_timeout
float

Tiempo en segundos para esperar a que el servicio autorice un token. El valor predeterminado es de 60 segundos. Si se establece en 0, no se aplicará ningún tiempo de espera desde el cliente.

user_agent
str

Si se especifica, se agregará delante de la cadena del agente de usuario.

retry_total
int

Número total de intentos de rehacer una operación con error cuando se produce un error. El valor predeterminado es 3. El contexto de retry_total en la recepción es especial: el método receive se implementa mediante un método de recepción interno que llama al bucle while en cada iteración. En el caso de recepción , retry_total especifica los números de reintento después del error generado por el método de recepción interno en el bucle while. Si se agotan los reintentos, se llamará a la devolución de llamada on_error (si se proporciona) con la información de error. Se cerrará el consumidor de partición interna con error (se llamará a on_partition_close si se proporciona) y se creará un nuevo consumidor de partición interna (se llamará on_partition_initialize si se proporciona) para reanudar la recepción.

retry_backoff_factor
float

Un factor de retroceso que se va a aplicar entre los intentos después del segundo intento (la mayoría de los errores se resuelven inmediatamente mediante un segundo intento sin retraso). En el modo fijo, la directiva de reintento siempre se suspenderá para {factor de retroceso}. En el modo "exponencial", la directiva de reintento se suspenderá para: {factor de retroceso} * (2 ** ({número de reintentos totales} - 1)) segundos. Si el backoff_factor es 0.1, el reintento se suspenderá para [0.0s, 0.2s, 0.4s, ...] entre reintentos. El valor predeterminado es 0,8.

retry_backoff_max
float

Tiempo de retroceso máximo. El valor predeterminado es 120 segundos (2 minutos).

retry_mode
str

Comportamiento de retraso entre los reintentos. Los valores admitidos son "fijo" o "exponencial", donde el valor predeterminado es "exponencial".

idle_timeout
float

Tiempo de espera, en segundos, después del cual este cliente cerrará la conexión subyacente si no hay ninguna actividad adicional. De forma predeterminada, el valor es None, lo que significa que el cliente no se apagará debido a inactividad a menos que el servicio lo inicie.

transport_type
TransportType

Tipo de protocolo de transporte que se usará para comunicarse con el servicio Event Hubs. El valor predeterminado es TransportType.Amqp en cuyo caso se usa el puerto 5671. Si el puerto 5671 no está disponible o bloqueado en el entorno de red, TransportType.AmqpOverWebsocket podría usarse en su lugar, que usa el puerto 443 para la comunicación.

checkpoint_store
Optional[CheckpointStore]

Administrador que almacena los datos de equilibrio de carga de partición y punto de control al recibir eventos. El almacén de puntos de comprobación se usará en ambos casos de recepción de todas las particiones o de una sola partición. En este último caso, no se aplica el equilibrio de carga. Si no se proporciona un almacén de puntos de control, el punto de control se mantendrá internamente en memoria y la instancia de EventHubConsumerClient recibirá eventos sin equilibrio de carga.

load_balancing_interval
float

Cuando se inicia el equilibrio de carga. Este es el intervalo, en segundos, entre dos evaluaciones de equilibrio de carga. El valor predeterminado es 30 segundos.

partition_ownership_expiration_interval
float

Una propiedad de partición expirará después de este número de segundos. Cada evaluación de equilibrio de carga extenderá automáticamente el tiempo de expiración de la propiedad. El valor predeterminado es 6 * load_balancing_interval, es decir, 180 segundos cuando se usa el load_balancing_interval predeterminado de 30 segundos.

load_balancing_strategy
str o LoadBalancingStrategy

Cuando se inicie el equilibrio de carga, usará esta estrategia para reclamar y equilibrar la propiedad de la partición. Use " expansiones" o LoadBalancingStrategy.GREEDY para la estrategia expansible, que, para cada evaluación de equilibrio de carga, obtendrá tantas particiones sin reclamar necesarias para equilibrar la carga. Use "equilibrado" o LoadBalancingStrategy.BALANCED para la estrategia equilibrada, que, para cada evaluación de equilibrio de carga, solo reclama una partición que no sea reclamada por otros EventHubConsumerClient. Si otras particiones de Un EventHub son reclamadas por otros EventHubConsumerClient y este cliente ha reclamado demasiadas particiones, este cliente robará una partición de otros clientes para cada evaluación de equilibrio de carga independientemente de la estrategia de equilibrio de carga. La estrategia expansa se usa de forma predeterminada.

custom_endpoint_address
Optional[str]

La dirección del punto de conexión personalizado que se va a usar para establecer una conexión con el servicio Event Hubs, lo que permite enrutar las solicitudes de red a través de las puertas de enlace de aplicaciones u otras rutas de acceso necesarias para el entorno host. El valor predeterminado es None. El formato sería "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Si no se especifica el puerto en el custom_endpoint_address, se usará el puerto 443 de forma predeterminada.

connection_verify
Optional[str]

Ruta de acceso al archivo CA_BUNDLE personalizado del certificado SSL que se usa para autenticar la identidad del punto de conexión de conexión. El valor predeterminado es None en cuyo caso se usará certifi.where().

uamqp_transport
bool

Si se va a usar la biblioteca uamqp como transporte subyacente. El valor predeterminado es False y la biblioteca AMQP de Python pura se usará como transporte subyacente.

Tipo de valor devuelto

Ejemplos

Cree una nueva instancia de EventHubConsumerClient a partir de cadena de conexión.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Obtiene las propiedades del centro de eventos.

Las claves del diccionario devuelto incluyen:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Devoluciones

Diccionario que contiene información sobre el centro de eventos.

Tipo de valor devuelto

Excepciones

get_partition_ids

Obtiene los identificadores de partición del centro de eventos.

async get_partition_ids() -> List[str]

Devoluciones

Lista de identificadores de partición.

Tipo de valor devuelto

Excepciones

get_partition_properties

Obtiene las propiedades de la partición especificada.

Las claves del diccionario de propiedades incluyen:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parámetros

partition_id
str
Requerido

Identificador de partición de destino.

Devoluciones

Diccionario que contiene propiedades de partición.

Tipo de valor devuelto

Excepciones

receive

Reciba eventos de particiones, con equilibrio de carga opcional y puntos de control.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parámetros

on_event
Callable[PartitionContext, Optional[EventData]]
Requerido

Función de devolución de llamada para controlar un evento recibido. La devolución de llamada toma dos parámetros: partition_context que contiene el contexto de partición y el evento que es el evento recibido. La función de devolución de llamada debe definirse como: on_event(partition_context, evento). Para obtener información detallada sobre el contexto de partición, consulte PartitionContext.

max_wait_time
float

Intervalo máximo en segundos que el procesador de eventos esperará antes de llamar a la devolución de llamada. Si no se recibe ningún evento dentro de este intervalo, se llamará a la devolución de llamada on_event con Ninguno. Si este valor se establece en Ninguno o 0 (valor predeterminado), no se llamará a la devolución de llamada hasta que se reciba un evento.

partition_id
str

Si se especifica, el cliente solo recibirá de esta partición. De lo contrario, el cliente recibirá de todas las particiones.

owner_level
int

Prioridad para un consumidor exclusivo. Se creará un consumidor exclusivo si se establece owner_level. Un consumidor con una mayor owner_level tiene una prioridad exclusiva más alta. El nivel de propietario también se conoce como el "valor de época" del consumidor.

prefetch
int

Número de eventos que se van a capturar previamente desde el servicio para su procesamiento. El valor predeterminado es 300.

track_last_enqueued_event_properties
bool

Indica si el consumidor debe solicitar información sobre el último evento en cola en su partición asociada y realizar un seguimiento de esa información a medida que se reciben los eventos. Cuando se realiza un seguimiento de la información sobre el último evento en cola de las particiones, cada evento recibido del servicio Event Hubs llevará metadatos sobre la partición. Esto da como resultado una pequeña cantidad de consumo de ancho de banda de red adicional que generalmente es un equilibrio favorable cuando se tiene en cuenta con la realización periódica de solicitudes para propiedades de partición mediante el cliente del centro de eventos. Se establece en False de forma predeterminada.

starting_position
str, int, datetime o dict[str,any]

Comience a recibir desde esta posición de evento si no hay datos de punto de comprobación para una partición. Los datos de punto de comprobación se usarán si están disponibles. Puede ser un dict con el identificador de partición como clave y posición como el valor de las particiones individuales, o un valor único para todas las particiones. El tipo de valor puede ser str, int o datetime.datetime. También se admiten los valores "-1" para recibir desde el principio de la secuencia y "@latest" para recibir solo eventos nuevos.

starting_position_inclusive
bool o dict[str,bool]

Determine si el starting_position especificado es inclusivo(>=) o no (>). True para inclusive y False para exclusivo. Puede ser un dict con el identificador de partición como clave y bool como valor que indica si el starting_position de una partición específica es inclusivo o no. También puede ser un valor bool único para todos los starting_position. El valor predeterminado es False.

on_error
Callable[[PartitionContext, Exception]]

Función de devolución de llamada a la que se llamará cuando se genere un error durante la recepción después de que se agoten los reintentos, o durante el proceso de equilibrio de carga. La devolución de llamada toma dos parámetros: partition_context que contiene información de partición y error como excepción. partition_context podría ser Ninguno si el error se genera durante el proceso de equilibrio de carga. La devolución de llamada debe definirse como: on_error(partition_context, error). También se llamará a la devolución de llamada on_error si se produce una excepción no controlada durante la devolución de llamada on_event .

on_partition_initialize
Callable[[PartitionContext]]

Función de devolución de llamada a la que se llamará después de que un consumidor para una determinada partición finalice la inicialización. También se llamaría cuando se crea un nuevo consumidor de partición interna para asumir el proceso de recepción de un consumidor de partición interno con errores y cerrado. La devolución de llamada toma un único parámetro: partition_context que contiene la información de partición. La devolución de llamada debe definirse como: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Función de devolución de llamada a la que se llamará después de que se cierre un consumidor para una determinada partición. También se llamaría cuando se produce un error durante la recepción después de que se agoten los intentos de reintento. La devolución de llamada toma dos parámetros: partition_context que contiene información de partición y motivo del cierre. La devolución de llamada debe definirse como: on_partition_close(partition_context, reason). CloseReason Consulte las diversas razones de cierre.

Tipo de valor devuelto

Ejemplos

Recibir eventos de EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Reciba eventos de particiones en lotes, con equilibrio de carga opcional y puntos de control.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parámetros

on_event_batch
Callable[PartitionContext, List[EventData]]
Requerido

Función de devolución de llamada para controlar un lote de eventos recibidos. La devolución de llamada toma dos parámetros: partition_context que contiene el contexto de partición y event_batch, que es los eventos recibidos. La función de devolución de llamada debe definirse como: on_event_batch(partition_context, event_batch). event_batch podría ser una lista vacía si max_wait_time no es None ni 0 y no se recibe ningún evento después de max_wait_time. Para obtener información detallada sobre el contexto de partición, consulte PartitionContext.

max_batch_size
int

Número máximo de eventos de un lote pasado a la devolución de llamada on_event_batch. Si el número real recibido de eventos es mayor que max_batch_size, los eventos recibidos se dividen en lotes y llaman a la devolución de llamada de cada lote con hasta max_batch_size eventos.

max_wait_time
float

Intervalo máximo en segundos que el procesador de eventos esperará antes de llamar a la devolución de llamada. Si no se recibe ningún evento dentro de este intervalo, se llamará a la devolución de llamada on_event_batch con una lista vacía. Si este valor se establece en Ninguno o 0 (valor predeterminado), no se llamará a la devolución de llamada hasta que se reciban eventos.

partition_id
str

Si se especifica, el cliente solo recibirá de esta partición. De lo contrario, el cliente recibirá de todas las particiones.

owner_level
int

Prioridad para un consumidor exclusivo. Se creará un consumidor exclusivo si se establece owner_level. Un consumidor con una mayor owner_level tiene una prioridad exclusiva más alta. El nivel de propietario también se conoce como el "valor de época" del consumidor.

prefetch
int

Número de eventos que se van a capturar previamente desde el servicio para su procesamiento. El valor predeterminado es 300.

track_last_enqueued_event_properties
bool

Indica si el consumidor debe solicitar información sobre el último evento en cola en su partición asociada y realizar un seguimiento de esa información a medida que se reciben los eventos. Cuando se realiza un seguimiento de la información sobre el último evento en cola de las particiones, cada evento recibido del servicio Event Hubs llevará metadatos sobre la partición. Esto da como resultado una pequeña cantidad de consumo de ancho de banda de red adicional que generalmente es un equilibrio favorable cuando se tiene en cuenta con la realización periódica de solicitudes para propiedades de partición mediante el cliente del centro de eventos. Se establece en False de forma predeterminada.

starting_position
str, int, datetime o dict[str,any]

Comience a recibir desde esta posición de evento si no hay datos de punto de comprobación para una partición. Los datos de punto de comprobación se usarán si están disponibles. Puede ser un dict con el identificador de partición como clave y posición como el valor de las particiones individuales, o un valor único para todas las particiones. El tipo de valor puede ser str, int o datetime.datetime. También se admiten los valores "-1" para recibir desde el principio de la secuencia y "@latest" para recibir solo eventos nuevos.

starting_position_inclusive
bool o dict[str,bool]

Determine si el starting_position especificado es inclusivo(>=) o no (>). True para inclusive y False para exclusivo. Puede ser un dict con el identificador de partición como clave y bool como valor que indica si el starting_position de una partición específica es inclusivo o no. También puede ser un valor bool único para todos los starting_position. El valor predeterminado es False.

on_error
Callable[[PartitionContext, Exception]]

Función de devolución de llamada a la que se llamará cuando se genere un error durante la recepción después de que se agoten los reintentos, o durante el proceso de equilibrio de carga. La devolución de llamada toma dos parámetros: partition_context que contiene información de partición y error como excepción. partition_context podría ser Ninguno si el error se genera durante el proceso de equilibrio de carga. La devolución de llamada debe definirse como: on_error(partition_context, error). También se llamará a la devolución de llamada on_error si se produce una excepción no controlada durante la devolución de llamada on_event .

on_partition_initialize
Callable[[PartitionContext]]

Función de devolución de llamada a la que se llamará después de que un consumidor para una determinada partición finalice la inicialización. También se llamaría cuando se crea un nuevo consumidor de partición interna para asumir el proceso de recepción de un consumidor de partición interno con errores y cerrado. La devolución de llamada toma un único parámetro: partition_context que contiene la información de partición. La devolución de llamada debe definirse como: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Función de devolución de llamada a la que se llamará después de que se cierre un consumidor para una determinada partición. También se llamaría cuando se produce un error durante la recepción después de que se agoten los intentos de reintento. La devolución de llamada toma dos parámetros: partition_context que contiene información de partición y motivo del cierre. La devolución de llamada debe definirse como: on_partition_close(partition_context, reason). CloseReason Consulte las diversas razones de cierre.

Tipo de valor devuelto

Ejemplos

Recibir eventos en lotes de EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )