ServiceBusReceiver Clase
La clase ServiceBusReceiver define una interfaz de alto nivel para recibir mensajes de la suscripción de Azure Service Bus Cola o Tema.
Los dos canales principales para la recepción de mensajes son receive() para realizar una única solicitud de mensajes y asincrónico para el mensaje en el receptor: para recibir continuamente los mensajes entrantes de forma continua.
Use el get_<queue/subscription>_receiver
método de ~azure.servicebus.aio.ServiceBusClient para crear una instancia de ServiceBusReceiver.
- Herencia
-
ServiceBusReceiverazure.servicebus.aio._base_handler_async.BaseHandlerServiceBusReceiverazure.servicebus._common.receiver_mixins.ReceiverMixinServiceBusReceiver
Constructor
ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)
Parámetros
- fully_qualified_namespace
- str
Nombre de host completo para el espacio de nombres de Service Bus. El formato del espacio de nombres es: .servicebus.windows.net.
- credential
- AsyncTokenCredential o AzureSasCredential o AzureNamedKeyCredential
Objeto de credencial usado para la autenticación que implementa una interfaz determinada para obtener tokens. Acepta objetos de credenciales generados por la biblioteca azure-identity y los objetos que implementan el método *get_token(self, scopes) o, como alternativa, también se puede proporcionar una instancia de AzureSasCredential.
- queue_name
- str
Ruta de acceso de una cola específica de Service Bus a la que se conecta el cliente.
- topic_name
- str
Ruta de acceso del tema de Service Bus específico al que contiene la suscripción a la que se conecta el cliente.
- subscription_name
- str
La ruta de acceso de una suscripción de Service Bus específica en el tema especificado al que se conecta el cliente.
- receive_mode
- Union[ServiceBusReceiveMode, str]
Modo con el que se recuperarán los mensajes de la entidad. Las dos opciones son PEEK_LOCK y RECEIVE_AND_DELETE. Los mensajes recibidos con PEEK_LOCK deben liquidarse dentro de un período de bloqueo determinado antes de que se quiten de la cola. Los mensajes recibidos con RECEIVE_AND_DELETE se quitarán inmediatamente de la cola y no se podrán abandonar ni volver a recibir posteriormente si el cliente no puede procesar el mensaje. El modo predeterminado es PEEK_LOCK.
Tiempo de espera en segundos entre los mensajes recibidos después del cual el receptor dejará de recibirse automáticamente. El valor predeterminado es None, lo que significa que no hay tiempo de espera.
- logging_enable
- bool
Indica si se van a generar registros de seguimiento de red en el registrador. El valor predeterminado es false.
- transport_type
- TransportType
Tipo de protocolo de transporte que se usará para comunicarse con el servicio de Service Bus. El valor predeterminado es TransportType.Amqp.
- http_proxy
- Dict
Configuración del proxy HTTP. Debe ser un diccionario con las siguientes claves: "proxy_hostname" (valor str) y "proxy_port" (valor int). Además, las claves siguientes también pueden estar presentes: "username", "password".
- user_agent
- str
Si se especifica, se agregará delante de la cadena del agente de usuario integrada.
- auto_lock_renewer
- Optional[AutoLockRenewer]
Se puede proporcionar un ~azure.servicebus.aio.AutoLockRenewer para que los mensajes se registren automáticamente al recibirlos. Si el receptor es un receptor de sesión, se aplicará a la sesión en su lugar.
- prefetch_count
- int
Número máximo de mensajes que se van a almacenar en caché con cada solicitud al servicio. Esta configuración solo es para la optimización avanzada del rendimiento. Aumentar este valor mejorará el rendimiento del mensaje, pero aumentará la posibilidad de que los mensajes expiren mientras se almacenan en caché si no se procesan lo suficientemente rápido. El valor predeterminado es 0, lo que significa que los mensajes se recibirán del servicio y se procesarán de uno en uno. En el caso de que prefetch_count sea 0, ServiceBusReceiver.receive intentaría almacenar en caché max_message_count (si se proporciona) dentro de su solicitud al servicio.
- client_identifier
- str
Identificador basado en cadenas para identificar de forma única la instancia de cliente. Service Bus lo asociará a algunos mensajes de error para facilitar la correlación de errores. Si no se especifica, se generará un identificador único.
- socket_timeout
- float
Tiempo en segundos que el socket subyacente de la conexión debe esperar al enviar y recibir datos antes de que se agote el tiempo de espera. El valor predeterminado es 0.2 para TransportType.Amqp y 1 para TransportType.AmqpOverWebsocket. Si se producen errores de conexión debido al tiempo de espera de escritura, es posible que sea necesario pasar un valor mayor que el predeterminado.
Variables
- fully_qualified_namespace
- str
Nombre de host completo para el espacio de nombres de Service Bus. El formato del espacio de nombres es: .servicebus.windows.net.
- entity_path
- str
Ruta de acceso de la entidad a la que se conecta el cliente.
Métodos
abandon_message |
Abandone el mensaje. Este mensaje se devolverá a la cola y estará disponible para que se reciba de nuevo. |
close | |
complete_message |
Complete el mensaje. Esto quita el mensaje de la cola. |
dead_letter_message |
Mueva el mensaje a la cola de mensajes fallidos. La cola de mensajes fallidos es una subcola que se puede usar para almacenar los mensajes que no se pudieron procesar correctamente o, de lo contrario, requerir una inspección o un procesamiento adicionales. La cola también se puede configurar para enviar mensajes expirados a la cola de mensajes fallidos. |
defer_message |
Aplaza el mensaje. Este mensaje permanecerá en la cola, pero debe solicitarse específicamente por su número de secuencia para que se reciba. |
peek_messages |
Examine los mensajes pendientes actualmente en la cola. Los mensajes inspeccionados no se quitan de la cola ni están bloqueados. No se pueden completar, aplazar o no escribir mensajes fallidos. |
receive_deferred_messages |
Recibir mensajes que se han aplazado anteriormente. Al recibir mensajes diferidos de una entidad con particiones, todos los números de secuencia proporcionados deben ser mensajes de la misma partición. |
receive_messages |
Recibir un lote de mensajes a la vez. Este enfoque es óptimo si desea procesar varios mensajes simultáneamente o realizar una recepción ad hoc como una sola llamada. Tenga en cuenta que el número de mensajes recuperados en un solo lote dependerá de si se estableció prefetch_count para el receptor. Si no se establece prefetch_count para el receptor, el receptor intentará almacenar en caché max_message_count mensajes (si se proporcionan) dentro de la solicitud al servicio. Esta llamada dará prioridad a la devolución rápidamente sobre la reunión de un tamaño de lote especificado, por lo que devolverá tan pronto como se reciba al menos un mensaje y haya un intervalo en los mensajes entrantes, independientemente del tamaño de lote especificado. |
renew_message_lock |
Renueve el bloqueo del mensaje. Esto mantendrá el bloqueo en el mensaje para asegurarse de que no se devuelve a la cola que se va a volver a procesar. Para completar (o liquidar) el mensaje, el bloqueo debe mantenerse y no puede haber expirado; No se puede renovar un bloqueo expirado. Los mensajes recibidos a través del modo RECEIVE_AND_DELETE no están bloqueados y, por lo tanto, no se pueden renovar. Esta operación solo está disponible para los mensajes que no son de sesión. |
abandon_message
Abandone el mensaje.
Este mensaje se devolverá a la cola y estará disponible para que se reciba de nuevo.
async abandon_message(message: ServiceBusReceivedMessage) -> None
Parámetros
Tipo de valor devuelto
Excepciones
Ejemplos
Abandone un mensaje recibido.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.abandon_message(message)
close
async close() -> None
Excepciones
complete_message
Complete el mensaje.
Esto quita el mensaje de la cola.
async complete_message(message: ServiceBusReceivedMessage) -> None
Parámetros
Tipo de valor devuelto
Excepciones
Ejemplos
Complete un mensaje recibido.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.complete_message(message)
dead_letter_message
Mueva el mensaje a la cola de mensajes fallidos.
La cola de mensajes fallidos es una subcola que se puede usar para almacenar los mensajes que no se pudieron procesar correctamente o, de lo contrario, requerir una inspección o un procesamiento adicionales. La cola también se puede configurar para enviar mensajes expirados a la cola de mensajes fallidos.
async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None
Parámetros
- message
- ServiceBusReceivedMessage
Mensaje recibido que se va a enviar por mensajes fallidos.
Descripción detallada del error para enviar mensajes fallidos al mensaje.
Tipo de valor devuelto
Excepciones
Ejemplos
Mensaje fallido recibido.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.dead_letter_message(message)
defer_message
Aplaza el mensaje.
Este mensaje permanecerá en la cola, pero debe solicitarse específicamente por su número de secuencia para que se reciba.
async defer_message(message: ServiceBusReceivedMessage) -> None
Parámetros
Tipo de valor devuelto
Excepciones
Ejemplos
Aplazar un mensaje recibido.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.defer_message(message)
peek_messages
Examine los mensajes pendientes actualmente en la cola.
Los mensajes inspeccionados no se quitan de la cola ni están bloqueados. No se pueden completar, aplazar o no escribir mensajes fallidos.
async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
Parámetros
- max_message_count
- int
Número máximo de mensajes que se van a intentar ver. El valor predeterminado es 1.
- sequence_number
- int
Número de secuencia de mensajes desde el que se van a empezar a examinar los mensajes.
Tiempo de espera total de la operación en segundos, incluidos todos los reintentos. El valor debe ser mayor que 0 si se especifica. El valor predeterminado es None, lo que significa que no hay tiempo de espera.
Devoluciones
Lista de objetos ~azure.servicebus.ServiceBusReceivedMessage.
Tipo de valor devuelto
Excepciones
Ejemplos
Ver los mensajes en la cola.
async with servicebus_receiver:
messages = await servicebus_receiver.peek_messages()
for message in messages:
print(str(message))
receive_deferred_messages
Recibir mensajes que se han aplazado anteriormente.
Al recibir mensajes diferidos de una entidad con particiones, todos los números de secuencia proporcionados deben ser mensajes de la misma partición.
async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
Parámetros
Lista de los números de secuencia de mensajes que se han aplazado.
Tiempo de espera total de la operación en segundos, incluidos todos los reintentos. El valor debe ser mayor que 0 si se especifica. El valor predeterminado es None, lo que significa que no hay tiempo de espera.
Devoluciones
Una lista de los mensajes recibidos.
Tipo de valor devuelto
Excepciones
Ejemplos
Recibir mensajes diferidos de ServiceBus.
async with servicebus_receiver:
deferred_sequenced_numbers = []
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
deferred_sequenced_numbers.append(message.sequence_number)
print(str(message))
await servicebus_receiver.defer_message(message)
received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
sequence_numbers=deferred_sequenced_numbers
)
for message in received_deferred_msg:
await servicebus_receiver.complete_message(message)
receive_messages
Recibir un lote de mensajes a la vez.
Este enfoque es óptimo si desea procesar varios mensajes simultáneamente o realizar una recepción ad hoc como una sola llamada.
Tenga en cuenta que el número de mensajes recuperados en un solo lote dependerá de si se estableció prefetch_count para el receptor. Si no se establece prefetch_count para el receptor, el receptor intentará almacenar en caché max_message_count mensajes (si se proporcionan) dentro de la solicitud al servicio.
Esta llamada dará prioridad a la devolución rápidamente sobre la reunión de un tamaño de lote especificado, por lo que devolverá tan pronto como se reciba al menos un mensaje y haya un intervalo en los mensajes entrantes, independientemente del tamaño de lote especificado.
async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]
Parámetros
Número máximo de mensajes del lote. El número real devuelto dependerá de prefetch_count tamaño y velocidad de flujo entrante. Establecer en Ninguno dependerá completamente de la configuración de captura previa. El valor predeterminado es 1.
Tiempo máximo de espera en segundos para que llegue el primer mensaje. Si no llegan mensajes y no se especifica ningún tiempo de espera, esta llamada no devolverá hasta que se cierre la conexión. Si se especifica y no llegan mensajes dentro del período de tiempo de espera, se devolverá una lista vacía.
Devoluciones
Lista de mensajes recibidos. Si no hay mensajes disponibles, será una lista vacía.
Tipo de valor devuelto
Excepciones
Ejemplos
Recibir mensajes de ServiceBus.
async with servicebus_receiver:
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
print(str(message))
await servicebus_receiver.complete_message(message)
renew_message_lock
Renueve el bloqueo del mensaje.
Esto mantendrá el bloqueo en el mensaje para asegurarse de que no se devuelve a la cola que se va a volver a procesar.
Para completar (o liquidar) el mensaje, el bloqueo debe mantenerse y no puede haber expirado; No se puede renovar un bloqueo expirado.
Los mensajes recibidos a través del modo RECEIVE_AND_DELETE no están bloqueados y, por lo tanto, no se pueden renovar. Esta operación solo está disponible para los mensajes que no son de sesión.
async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime
Parámetros
Tiempo de espera total de la operación en segundos, incluidos todos los reintentos. El valor debe ser mayor que 0 si se especifica. El valor predeterminado es None, lo que significa que no hay tiempo de espera.
Devoluciones
Fecha y hora utc en la que se establece el bloqueo para que expire en .
Tipo de valor devuelto
Excepciones
Ejemplos
Renueve el bloqueo en un mensaje recibido.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.renew_message_lock(message)
Atributos
client_identifier
Obtenga el identificador de cliente de ServiceBusReceiver asociado a la instancia del receptor.
Tipo de valor devuelto
session
Obtenga el objeto ServiceBusSession vinculado con el receptor. La sesión solo está disponible para las entidades habilitadas para la sesión; devolvería None si se llama a en un receptor no con sesión.
Tipo de valor devuelto
Ejemplos
Obtener sesión de un receptor
async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
session = receiver.session