Compartir a través de


ServiceBusReceiver Clase

La clase ServiceBusReceiver define una interfaz de alto nivel para recibir mensajes de la suscripción de Azure Service Bus Cola o Tema.

Los dos canales principales para la recepción de mensajes son receive() para realizar una única solicitud de mensajes y asincrónico para el mensaje en el receptor: para recibir continuamente los mensajes entrantes de forma continua.

Use el get_<queue/subscription>_receiver método de ~azure.servicebus.aio.ServiceBusClient para crear una instancia de ServiceBusReceiver.

Herencia
ServiceBusReceiver
azure.servicebus.aio._base_handler_async.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

Constructor

ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

Parámetros

fully_qualified_namespace
str
Requerido

Nombre de host completo para el espacio de nombres de Service Bus. El formato del espacio de nombres es: .servicebus.windows.net.

credential
AsyncTokenCredential o AzureSasCredential o AzureNamedKeyCredential
Requerido

Objeto de credencial usado para la autenticación que implementa una interfaz determinada para obtener tokens. Acepta objetos de credenciales generados por la biblioteca azure-identity y los objetos que implementan el método *get_token(self, scopes) o, como alternativa, también se puede proporcionar una instancia de AzureSasCredential.

queue_name
str

Ruta de acceso de una cola específica de Service Bus a la que se conecta el cliente.

topic_name
str

Ruta de acceso del tema de Service Bus específico al que contiene la suscripción a la que se conecta el cliente.

subscription_name
str

La ruta de acceso de una suscripción de Service Bus específica en el tema especificado al que se conecta el cliente.

receive_mode
Union[ServiceBusReceiveMode, str]

Modo con el que se recuperarán los mensajes de la entidad. Las dos opciones son PEEK_LOCK y RECEIVE_AND_DELETE. Los mensajes recibidos con PEEK_LOCK deben liquidarse dentro de un período de bloqueo determinado antes de que se quiten de la cola. Los mensajes recibidos con RECEIVE_AND_DELETE se quitarán inmediatamente de la cola y no se podrán abandonar ni volver a recibir posteriormente si el cliente no puede procesar el mensaje. El modo predeterminado es PEEK_LOCK.

max_wait_time
Optional[float]

Tiempo de espera en segundos entre los mensajes recibidos después del cual el receptor dejará de recibirse automáticamente. El valor predeterminado es None, lo que significa que no hay tiempo de espera.

logging_enable
bool

Indica si se van a generar registros de seguimiento de red en el registrador. El valor predeterminado es false.

transport_type
TransportType

Tipo de protocolo de transporte que se usará para comunicarse con el servicio de Service Bus. El valor predeterminado es TransportType.Amqp.

http_proxy
Dict

Configuración del proxy HTTP. Debe ser un diccionario con las siguientes claves: "proxy_hostname" (valor str) y "proxy_port" (valor int). Además, las claves siguientes también pueden estar presentes: "username", "password".

user_agent
str

Si se especifica, se agregará delante de la cadena del agente de usuario integrada.

auto_lock_renewer
Optional[AutoLockRenewer]

Se puede proporcionar un ~azure.servicebus.aio.AutoLockRenewer para que los mensajes se registren automáticamente al recibirlos. Si el receptor es un receptor de sesión, se aplicará a la sesión en su lugar.

prefetch_count
int

Número máximo de mensajes que se van a almacenar en caché con cada solicitud al servicio. Esta configuración solo es para la optimización avanzada del rendimiento. Aumentar este valor mejorará el rendimiento del mensaje, pero aumentará la posibilidad de que los mensajes expiren mientras se almacenan en caché si no se procesan lo suficientemente rápido. El valor predeterminado es 0, lo que significa que los mensajes se recibirán del servicio y se procesarán de uno en uno. En el caso de que prefetch_count sea 0, ServiceBusReceiver.receive intentaría almacenar en caché max_message_count (si se proporciona) dentro de su solicitud al servicio.

client_identifier
str

Identificador basado en cadenas para identificar de forma única la instancia de cliente. Service Bus lo asociará a algunos mensajes de error para facilitar la correlación de errores. Si no se especifica, se generará un identificador único.

socket_timeout
float

Tiempo en segundos que el socket subyacente de la conexión debe esperar al enviar y recibir datos antes de que se agote el tiempo de espera. El valor predeterminado es 0.2 para TransportType.Amqp y 1 para TransportType.AmqpOverWebsocket. Si se producen errores de conexión debido al tiempo de espera de escritura, es posible que sea necesario pasar un valor mayor que el predeterminado.

Variables

fully_qualified_namespace
str

Nombre de host completo para el espacio de nombres de Service Bus. El formato del espacio de nombres es: .servicebus.windows.net.

entity_path
str

Ruta de acceso de la entidad a la que se conecta el cliente.

Métodos

abandon_message

Abandone el mensaje.

Este mensaje se devolverá a la cola y estará disponible para que se reciba de nuevo.

close
complete_message

Complete el mensaje.

Esto quita el mensaje de la cola.

dead_letter_message

Mueva el mensaje a la cola de mensajes fallidos.

La cola de mensajes fallidos es una subcola que se puede usar para almacenar los mensajes que no se pudieron procesar correctamente o, de lo contrario, requerir una inspección o un procesamiento adicionales. La cola también se puede configurar para enviar mensajes expirados a la cola de mensajes fallidos.

defer_message

Aplaza el mensaje.

Este mensaje permanecerá en la cola, pero debe solicitarse específicamente por su número de secuencia para que se reciba.

peek_messages

Examine los mensajes pendientes actualmente en la cola.

Los mensajes inspeccionados no se quitan de la cola ni están bloqueados. No se pueden completar, aplazar o no escribir mensajes fallidos.

receive_deferred_messages

Recibir mensajes que se han aplazado anteriormente.

Al recibir mensajes diferidos de una entidad con particiones, todos los números de secuencia proporcionados deben ser mensajes de la misma partición.

receive_messages

Recibir un lote de mensajes a la vez.

Este enfoque es óptimo si desea procesar varios mensajes simultáneamente o realizar una recepción ad hoc como una sola llamada.

Tenga en cuenta que el número de mensajes recuperados en un solo lote dependerá de si se estableció prefetch_count para el receptor. Si no se establece prefetch_count para el receptor, el receptor intentará almacenar en caché max_message_count mensajes (si se proporcionan) dentro de la solicitud al servicio.

Esta llamada dará prioridad a la devolución rápidamente sobre la reunión de un tamaño de lote especificado, por lo que devolverá tan pronto como se reciba al menos un mensaje y haya un intervalo en los mensajes entrantes, independientemente del tamaño de lote especificado.

renew_message_lock

Renueve el bloqueo del mensaje.

Esto mantendrá el bloqueo en el mensaje para asegurarse de que no se devuelve a la cola que se va a volver a procesar.

Para completar (o liquidar) el mensaje, el bloqueo debe mantenerse y no puede haber expirado; No se puede renovar un bloqueo expirado.

Los mensajes recibidos a través del modo RECEIVE_AND_DELETE no están bloqueados y, por lo tanto, no se pueden renovar. Esta operación solo está disponible para los mensajes que no son de sesión.

abandon_message

Abandone el mensaje.

Este mensaje se devolverá a la cola y estará disponible para que se reciba de nuevo.

async abandon_message(message: ServiceBusReceivedMessage) -> None

Parámetros

message
ServiceBusReceivedMessage
Requerido

Mensaje recibido que se va a abandonar.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Abandone un mensaje recibido.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.abandon_message(message)

close

async close() -> None

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

Complete el mensaje.

Esto quita el mensaje de la cola.

async complete_message(message: ServiceBusReceivedMessage) -> None

Parámetros

message
ServiceBusReceivedMessage
Requerido

Mensaje recibido que se va a completar.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Complete un mensaje recibido.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.complete_message(message)

dead_letter_message

Mueva el mensaje a la cola de mensajes fallidos.

La cola de mensajes fallidos es una subcola que se puede usar para almacenar los mensajes que no se pudieron procesar correctamente o, de lo contrario, requerir una inspección o un procesamiento adicionales. La cola también se puede configurar para enviar mensajes expirados a la cola de mensajes fallidos.

async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

Parámetros

message
ServiceBusReceivedMessage
Requerido

Mensaje recibido que se va a enviar por mensajes fallidos.

reason
Optional[str]
valor predeterminado: None

Motivo para enviar mensajes fallidos al mensaje.

error_description
Optional[str]
valor predeterminado: None

Descripción detallada del error para enviar mensajes fallidos al mensaje.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Mensaje fallido recibido.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.dead_letter_message(message)

defer_message

Aplaza el mensaje.

Este mensaje permanecerá en la cola, pero debe solicitarse específicamente por su número de secuencia para que se reciba.

async defer_message(message: ServiceBusReceivedMessage) -> None

Parámetros

message
ServiceBusReceivedMessage
Requerido

Mensaje recibido que se va a aplazar.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Aplazar un mensaje recibido.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.defer_message(message)

peek_messages

Examine los mensajes pendientes actualmente en la cola.

Los mensajes inspeccionados no se quitan de la cola ni están bloqueados. No se pueden completar, aplazar o no escribir mensajes fallidos.

async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parámetros

max_message_count
int
valor predeterminado: 1

Número máximo de mensajes que se van a intentar ver. El valor predeterminado es 1.

sequence_number
int

Número de secuencia de mensajes desde el que se van a empezar a examinar los mensajes.

timeout
Optional[float]

Tiempo de espera total de la operación en segundos, incluidos todos los reintentos. El valor debe ser mayor que 0 si se especifica. El valor predeterminado es None, lo que significa que no hay tiempo de espera.

Devoluciones

Lista de objetos ~azure.servicebus.ServiceBusReceivedMessage.

Tipo de valor devuelto

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Ver los mensajes en la cola.


   async with servicebus_receiver:
       messages = await servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

Recibir mensajes que se han aplazado anteriormente.

Al recibir mensajes diferidos de una entidad con particiones, todos los números de secuencia proporcionados deben ser mensajes de la misma partición.

async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parámetros

sequence_numbers
Union[int, list[int]]
Requerido

Lista de los números de secuencia de mensajes que se han aplazado.

timeout
Optional[float]

Tiempo de espera total de la operación en segundos, incluidos todos los reintentos. El valor debe ser mayor que 0 si se especifica. El valor predeterminado es None, lo que significa que no hay tiempo de espera.

Devoluciones

Una lista de los mensajes recibidos.

Tipo de valor devuelto

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Recibir mensajes diferidos de ServiceBus.


   async with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           await servicebus_receiver.defer_message(message)

       received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for message in received_deferred_msg:
           await servicebus_receiver.complete_message(message)

receive_messages

Recibir un lote de mensajes a la vez.

Este enfoque es óptimo si desea procesar varios mensajes simultáneamente o realizar una recepción ad hoc como una sola llamada.

Tenga en cuenta que el número de mensajes recuperados en un solo lote dependerá de si se estableció prefetch_count para el receptor. Si no se establece prefetch_count para el receptor, el receptor intentará almacenar en caché max_message_count mensajes (si se proporcionan) dentro de la solicitud al servicio.

Esta llamada dará prioridad a la devolución rápidamente sobre la reunión de un tamaño de lote especificado, por lo que devolverá tan pronto como se reciba al menos un mensaje y haya un intervalo en los mensajes entrantes, independientemente del tamaño de lote especificado.

async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

Parámetros

max_message_count
Optional[int]
valor predeterminado: 1

Número máximo de mensajes del lote. El número real devuelto dependerá de prefetch_count tamaño y velocidad de flujo entrante. Establecer en Ninguno dependerá completamente de la configuración de captura previa. El valor predeterminado es 1.

max_wait_time
Optional[float]
valor predeterminado: None

Tiempo máximo de espera en segundos para que llegue el primer mensaje. Si no llegan mensajes y no se especifica ningún tiempo de espera, esta llamada no devolverá hasta que se cierre la conexión. Si se especifica y no llegan mensajes dentro del período de tiempo de espera, se devolverá una lista vacía.

Devoluciones

Lista de mensajes recibidos. Si no hay mensajes disponibles, será una lista vacía.

Tipo de valor devuelto

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

Excepciones

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Ejemplos

Recibir mensajes de ServiceBus.


   async with servicebus_receiver:
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           await servicebus_receiver.complete_message(message)

renew_message_lock

Renueve el bloqueo del mensaje.

Esto mantendrá el bloqueo en el mensaje para asegurarse de que no se devuelve a la cola que se va a volver a procesar.

Para completar (o liquidar) el mensaje, el bloqueo debe mantenerse y no puede haber expirado; No se puede renovar un bloqueo expirado.

Los mensajes recibidos a través del modo RECEIVE_AND_DELETE no están bloqueados y, por lo tanto, no se pueden renovar. Esta operación solo está disponible para los mensajes que no son de sesión.

async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

Parámetros

message
ServiceBusReceivedMessage
Requerido

Mensaje para el que se va a renovar el bloqueo.

timeout
Optional[float]

Tiempo de espera total de la operación en segundos, incluidos todos los reintentos. El valor debe ser mayor que 0 si se especifica. El valor predeterminado es None, lo que significa que no hay tiempo de espera.

Devoluciones

Fecha y hora utc en la que se establece el bloqueo para que expire en .

Tipo de valor devuelto

Excepciones

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

Ejemplos

Renueve el bloqueo en un mensaje recibido.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.renew_message_lock(message)

Atributos

client_identifier

Obtenga el identificador de cliente de ServiceBusReceiver asociado a la instancia del receptor.

Tipo de valor devuelto

str

session

Obtenga el objeto ServiceBusSession vinculado con el receptor. La sesión solo está disponible para las entidades habilitadas para la sesión; devolvería None si se llama a en un receptor no con sesión.

Tipo de valor devuelto

Ejemplos

Obtener sesión de un receptor


       async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session