Freigeben über


ServiceBusReceiver Klasse

Die ServiceBusReceiver-Klasse definiert eine allgemeine Schnittstelle zum Empfangen von Nachrichten aus dem Azure Service Bus Queue oder Topic Subscription.

Die beiden primären Kanäle für den Nachrichtenempfang sind receive(), um eine einzelne Anforderung für Nachrichten zu stellen, und asynchron für Nachrichten im Empfänger: um eingehende Nachrichten kontinuierlich zu empfangen.

Verwenden Sie die get_<queue/subscription>_receiver Methode von ~azure.servicebus.aio.ServiceBusClient, um eine ServiceBusReceiver-instance zu erstellen.

Vererbung
ServiceBusReceiver
azure.servicebus.aio._base_handler_async.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

Konstruktor

ServiceBusReceiver(fully_qualified_namespace: str, credential: AsyncTokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

Parameter

fully_qualified_namespace
str
Erforderlich

Der vollqualifizierte Hostname für den Service Bus-Namespace. Das Namespaceformat ist . servicebus.windows.net.

credential
AsyncTokenCredential oder AzureSasCredential oder AzureNamedKeyCredential
Erforderlich

Das für die Authentifizierung verwendete Anmeldeinformationsobjekt, das eine bestimmte Schnittstelle zum Abrufen von Token implementiert. Es akzeptiert Anmeldeinformationsobjekte, die von der azure-identity-Bibliothek generiert werden, und Objekte, die die *get_token(self, scopes) -Methode implementieren, oder alternativ kann auch ein AzureSasCredential-Objekt bereitgestellt werden.

queue_name
str

Der Pfad einer bestimmten Service Bus-Warteschlange, mit der der Client eine Verbindung herstellt.

topic_name
str

Der Pfad eines bestimmten Service Bus-Themas, das das Abonnement enthält, mit dem der Client eine Verbindung herstellt.

subscription_name
str

Der Pfad eines bestimmten Service Bus-Abonnements unter dem angegebenen Thema, mit dem der Client eine Verbindung herstellt.

receive_mode
Union[ServiceBusReceiveMode, str]

Der Modus, in dem Nachrichten von der Entität abgerufen werden. Die beiden Optionen sind PEEK_LOCK und RECEIVE_AND_DELETE. Nachrichten, die mit PEEK_LOCK empfangen werden, müssen innerhalb eines bestimmten Sperrzeitraums abgerechnet werden, bevor sie aus der Warteschlange entfernt werden. Nachrichten, die mit RECEIVE_AND_DELETE empfangen werden, werden sofort aus der Warteschlange entfernt und können anschließend nicht abgebrochen oder erneut empfangen werden, wenn der Client die Nachricht nicht verarbeitet. Der Standardmodus ist PEEK_LOCK.

max_wait_time
Optional[float]

Das Timeout in Sekunden zwischen empfangenen Nachrichten, nach dem der Empfänger automatisch den Empfang beendet. Der Standardwert ist None, d. h. kein Timeout.

logging_enable
bool

Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.

transport_type
TransportType

Der Typ des Transportprotokolls, das für die Kommunikation mit dem Service Bus-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp.

http_proxy
Dict

HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (str-Wert) und "proxy_port" (int-Wert). Darüber hinaus können auch die folgenden Schlüssel vorhanden sein: "Benutzername", "Kennwort".

user_agent
str

Wenn angegeben, wird dies vor der integrierten Benutzer-Agent-Zeichenfolge hinzugefügt.

auto_lock_renewer
Optional[AutoLockRenewer]

Ein ~azure.servicebus.aio.AutoLockRenewer kann so bereitgestellt werden, dass Nachrichten beim Empfang automatisch registriert werden. Wenn der Empfänger ein Sitzungsempfänger ist, gilt er stattdessen für die Sitzung.

prefetch_count
int

Die maximale Anzahl von Nachrichten, die mit jeder Anforderung an den Dienst zwischengespeichert werden sollen. Diese Einstellung ist nur für die erweiterte Leistungsoptimierung vorgesehen. Wenn Sie diesen Wert erhöhen, wird die Leistung des Nachrichtendurchsatzes verbessert, aber die Gefahr erhöht, dass Nachrichten ablaufen, während sie zwischengespeichert werden, wenn sie nicht schnell genug verarbeitet werden. Der Standardwert ist 0, d. h. Nachrichten werden vom Dienst empfangen und nacheinander verarbeitet. Im Fall, dass prefetch_count 0 ist, versucht ServiceBusReceiver.receive , max_message_count (sofern angegeben) innerhalb der Anforderung an den Dienst zwischenzuspeichern.

client_identifier
str

Ein Zeichenfolgenbasierter Bezeichner zur eindeutigen Identifizierung des Clients instance. Service Bus ordnet sie einigen Fehlermeldungen zu, um die Korrelation von Fehlern zu vereinfachen. Wenn nicht angegeben, wird eine eindeutige ID generiert.

socket_timeout
float

Die Zeit in Sekunden, die der zugrunde liegende Socket für die Verbindung beim Senden und Empfangen von Daten warten soll, bevor ein Timeout auftritt. Der Standardwert ist 0,2 für TransportType.Amqp und 1 für TransportType.AmqpOverWebsocket. Wenn Verbindungsfehler aufgrund eines Schreibzeitlimits auftreten, muss möglicherweise ein größer als der Standardwert übergeben werden.

Variablen

fully_qualified_namespace
str

Der vollqualifizierte Hostname für den Service Bus-Namespace. Das Namespaceformat ist .servicebus.windows.net.

entity_path
str

Der Pfad der Entität, mit der der Client eine Verbindung herstellt.

Methoden

abandon_message

Verlassen Sie die Nachricht.

Diese Nachricht wird an die Warteschlange zurückgegeben und für den Erneuten Empfang zur Verfügung gestellt.

close
complete_message

Vervollständigen Sie die Nachricht.

Dadurch wird die Nachricht aus der Warteschlange entfernt.

dead_letter_message

Verschieben Sie die Nachricht in die Warteschlange für unzustellbare Nachrichten.

Die Warteschlange für unzustellbare Nachrichten ist eine Untergeordnete Warteschlange, die zum Speichern von Nachrichten verwendet werden kann, die nicht ordnungsgemäß verarbeitet werden konnten oder anderweitig eine weitere Überprüfung oder Verarbeitung erfordern. Die Warteschlange kann auch so konfiguriert werden, dass abgelaufene Nachrichten an die Warteschlange für unzustellbare Nachrichten gesendet werden.

defer_message

Verschiebt die Nachricht.

Diese Nachricht verbleibt in der Warteschlange, muss jedoch speziell von ihrer Sequenznummer angefordert werden, um empfangen zu werden.

peek_messages

Durchsuchen von Nachrichten, die derzeit in der Warteschlange ausstehen.

Eingesehene Nachrichten werden weder aus der Warteschlange entfernt noch gesperrt. Sie können nicht abgeschlossen, verzögert oder unzustellbar sein.

receive_deferred_messages

Empfangen von Nachrichten, die zuvor verzögert wurden.

Wenn Sie verzögerte Nachrichten von einer partitionierten Entität empfangen, müssen alle angegebenen Sequenznummern Nachrichten aus derselben Partition sein.

receive_messages

Empfangen sie einen Batch von Nachrichten gleichzeitig.

Dieser Ansatz ist optimal, wenn Sie mehrere Nachrichten gleichzeitig verarbeiten oder einen Ad-hoc-Empfang als einzelnen Anruf ausführen möchten.

Beachten Sie, dass die Anzahl der in einem einzelnen Batch abgerufenen Nachrichten davon abhängt, ob prefetch_count für den Empfänger festgelegt wurde. Wenn prefetch_count nicht für den Empfänger festgelegt ist, versucht der Empfänger, max_message_count Nachrichten (sofern angegeben) innerhalb der Anforderung an den Dienst zwischenzuspeichern.

Dieser Aufruf priorisiert die schnelle Rückgabe gegenüber der Erfüllung einer angegebenen Batchgröße und wird daher zurückgegeben, sobald mindestens eine Nachricht empfangen wird und unabhängig von der angegebenen Batchgröße eine Lücke bei eingehenden Nachrichten besteht.

renew_message_lock

Erneuern Sie die Nachrichtensperre.

Dadurch wird die Sperre für die Nachricht beibehalten, um sicherzustellen, dass sie nicht an die neu zu verarbeitende Warteschlange zurückgegeben wird.

Um die Nachricht abzuschließen (oder anderweitig zu begleichen), muss die Sperre beibehalten werden und kann nicht bereits abgelaufen sein. eine abgelaufene Sperre kann nicht verlängert werden.

Nachrichten, die über RECEIVE_AND_DELETE-Modus empfangen werden, sind nicht gesperrt und können daher nicht verlängert werden. Dieser Vorgang ist auch nur für nicht sitzungsbehaftete Nachrichten verfügbar.

abandon_message

Verlassen Sie die Nachricht.

Diese Nachricht wird an die Warteschlange zurückgegeben und für den Erneuten Empfang zur Verfügung gestellt.

async abandon_message(message: ServiceBusReceivedMessage) -> None

Parameter

message
ServiceBusReceivedMessage
Erforderlich

Die empfangene Nachricht, die abgebrochen werden soll.

Rückgabetyp

Ausnahmen

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Beispiele

Geben Sie eine empfangene Nachricht ab.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.abandon_message(message)

close

async close() -> None

Ausnahmen

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

Vervollständigen Sie die Nachricht.

Dadurch wird die Nachricht aus der Warteschlange entfernt.

async complete_message(message: ServiceBusReceivedMessage) -> None

Parameter

message
ServiceBusReceivedMessage
Erforderlich

Die empfangene Nachricht, die abgeschlossen werden soll.

Rückgabetyp

Ausnahmen

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Beispiele

Vervollständigen Sie eine empfangene Nachricht.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.complete_message(message)

dead_letter_message

Verschieben Sie die Nachricht in die Warteschlange für unzustellbare Nachrichten.

Die Warteschlange für unzustellbare Nachrichten ist eine Untergeordnete Warteschlange, die zum Speichern von Nachrichten verwendet werden kann, die nicht ordnungsgemäß verarbeitet werden konnten oder anderweitig eine weitere Überprüfung oder Verarbeitung erfordern. Die Warteschlange kann auch so konfiguriert werden, dass abgelaufene Nachrichten an die Warteschlange für unzustellbare Nachrichten gesendet werden.

async dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

Parameter

message
ServiceBusReceivedMessage
Erforderlich

Die empfangene Nachricht, die unzustellbar ist.

reason
Optional[str]
Standardwert: None

Der Grund für die Unzustellbare Nachricht.

error_description
Optional[str]
Standardwert: None

Die detaillierte Fehlerbeschreibung für unzustellbare Nachrichten.

Rückgabetyp

Ausnahmen

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Beispiele

Unzustellbarer Brief einer empfangenen Nachricht.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.dead_letter_message(message)

defer_message

Verschiebt die Nachricht.

Diese Nachricht verbleibt in der Warteschlange, muss jedoch speziell von ihrer Sequenznummer angefordert werden, um empfangen zu werden.

async defer_message(message: ServiceBusReceivedMessage) -> None

Parameter

message
ServiceBusReceivedMessage
Erforderlich

Die empfangene Nachricht, die verzögert werden soll.

Rückgabetyp

Ausnahmen

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Beispiele

Zurückstellen einer empfangenen Nachricht.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.defer_message(message)

peek_messages

Durchsuchen von Nachrichten, die derzeit in der Warteschlange ausstehen.

Eingesehene Nachrichten werden weder aus der Warteschlange entfernt noch gesperrt. Sie können nicht abgeschlossen, verzögert oder unzustellbar sein.

async peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parameter

max_message_count
int
Standardwert: 1

Die maximale Anzahl von Nachrichten, die versucht und eingesehen werden sollen. Der Standardwert ist 1.

sequence_number
int

Eine Nachrichtensequenznummer, von der aus das Durchsuchen von Nachrichten gestartet werden soll.

timeout
Optional[float]

Das Gesamtzeitlimit des Vorgangs in Sekunden, einschließlich aller Wiederholungen. Der Wert muss größer als 0 sein, wenn er angegeben wird. Der Standardwert ist None, d. h. kein Timeout.

Gibt zurück

Eine Liste der ~azure.servicebus.ServiceBusReceivedMessage-Objekte.

Rückgabetyp

Ausnahmen

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Beispiele

Anzeigen von Nachrichten in der Warteschlange


   async with servicebus_receiver:
       messages = await servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

Empfangen von Nachrichten, die zuvor verzögert wurden.

Wenn Sie verzögerte Nachrichten von einer partitionierten Entität empfangen, müssen alle angegebenen Sequenznummern Nachrichten aus derselben Partition sein.

async receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parameter

sequence_numbers
Union[int, list[int]]
Erforderlich

Eine Liste der Sequenznummern von Nachrichten, die verzögert wurden.

timeout
Optional[float]

Das Gesamtzeitlimit des Vorgangs in Sekunden, einschließlich aller Wiederholungen. Der Wert muss größer als 0 sein, wenn er angegeben wird. Der Standardwert ist None, d. h. kein Timeout.

Gibt zurück

Eine Liste der empfangenen Nachrichten.

Rückgabetyp

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

Ausnahmen

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Beispiele

Empfangen von verzögerten Nachrichten von ServiceBus.


   async with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           await servicebus_receiver.defer_message(message)

       received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for message in received_deferred_msg:
           await servicebus_receiver.complete_message(message)

receive_messages

Empfangen sie einen Batch von Nachrichten gleichzeitig.

Dieser Ansatz ist optimal, wenn Sie mehrere Nachrichten gleichzeitig verarbeiten oder einen Ad-hoc-Empfang als einzelnen Anruf ausführen möchten.

Beachten Sie, dass die Anzahl der in einem einzelnen Batch abgerufenen Nachrichten davon abhängt, ob prefetch_count für den Empfänger festgelegt wurde. Wenn prefetch_count nicht für den Empfänger festgelegt ist, versucht der Empfänger, max_message_count Nachrichten (sofern angegeben) innerhalb der Anforderung an den Dienst zwischenzuspeichern.

Dieser Aufruf priorisiert die schnelle Rückgabe gegenüber der Erfüllung einer angegebenen Batchgröße und wird daher zurückgegeben, sobald mindestens eine Nachricht empfangen wird und unabhängig von der angegebenen Batchgröße eine Lücke bei eingehenden Nachrichten besteht.

async receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

Parameter

max_message_count
Optional[int]
Standardwert: 1

Maximale Anzahl von Nachrichten im Batch. Die tatsächliche zurückgegebene Zahl hängt von prefetch_count Größe und eingehender Datenstromrate ab. Die Einstellung auf Keine hängt vollständig von der Vorabrufkonfiguration ab. Der Standardwert ist 1.

max_wait_time
Optional[float]
Standardwert: None

Maximale Wartezeit in Sekunden, bis die erste Nachricht eintrifft. Wenn keine Nachrichten eingehen und kein Timeout angegeben wird, wird dieser Aufruf erst zurückgegeben, wenn die Verbindung geschlossen wird. Wenn angegeben und innerhalb des Timeoutzeitraums keine Nachrichten eingehen, wird eine leere Liste zurückgegeben.

Gibt zurück

Eine Liste der empfangenen Nachrichten. Wenn keine Nachrichten verfügbar sind, ist dies eine leere Liste.

Rückgabetyp

list[<xref:azure.servicebus.aio.ServiceBusReceivedMessage>]

Ausnahmen

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Beispiele

Empfangen von Nachrichten von ServiceBus.


   async with servicebus_receiver:
       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           await servicebus_receiver.complete_message(message)

renew_message_lock

Erneuern Sie die Nachrichtensperre.

Dadurch wird die Sperre für die Nachricht beibehalten, um sicherzustellen, dass sie nicht an die neu zu verarbeitende Warteschlange zurückgegeben wird.

Um die Nachricht abzuschließen (oder anderweitig zu begleichen), muss die Sperre beibehalten werden und kann nicht bereits abgelaufen sein. eine abgelaufene Sperre kann nicht verlängert werden.

Nachrichten, die über RECEIVE_AND_DELETE-Modus empfangen werden, sind nicht gesperrt und können daher nicht verlängert werden. Dieser Vorgang ist auch nur für nicht sitzungsbehaftete Nachrichten verfügbar.

async renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

Parameter

message
ServiceBusReceivedMessage
Erforderlich

Die Nachricht, für die die Sperre verlängert werden soll.

timeout
Optional[float]

Das Gesamtzeitlimit des Vorgangs in Sekunden, einschließlich aller Wiederholungen. Der Wert muss größer als 0 sein, wenn er angegeben wird. Der Standardwert ist None, d. h. kein Timeout.

Gibt zurück

Die utc datetime, bei der die Sperre abläuft.

Rückgabetyp

Ausnahmen

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

Beispiele

Erneuern Sie die Sperre für eine empfangene Nachricht.


       messages = await servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           await servicebus_receiver.renew_message_lock(message)

Attribute

client_identifier

Rufen Sie den ServiceBusReceiver-Clientbezeichner ab, der dem Empfänger instance zugeordnet ist.

Rückgabetyp

str

session

Rufen Sie das ServiceBusSession-Objekt ab, das mit dem Empfänger verknüpft ist. Die Sitzung ist nur für sitzungsfähige Entitäten verfügbar. Sie würde Keine zurückgeben, wenn sie auf einem nicht sitzungsfähigen Empfänger aufgerufen wird.

Rückgabetyp

Beispiele

Abrufen einer Sitzung von einem Empfänger


       async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session