Udostępnij za pośrednictwem


ServiceBusReceiver Klasa

Klasa ServiceBusReceiver definiuje interfejs wysokiego poziomu do odbierania komunikatów z Azure Service Bus kolejki lub subskrypcji tematu.

Dwa główne kanały odbioru komunikatów są odbierane () w celu utworzenia pojedynczego żądania dla komunikatów i komunikatu w odbiorniku: ciągłego odbierania przychodzących komunikatów w sposób ciągły.

Użyj get_<queue/subscription>_receiver metody ~azure.servicebus.ServiceBusClient, aby utworzyć wystąpienie klasy ServiceBusReceiver.

Dziedziczenie
azure.servicebus._base_handler.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

Konstruktor

ServiceBusReceiver(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

Parametry

fully_qualified_namespace
str
Wymagane

W pełni kwalifikowana nazwa hosta dla przestrzeni nazw usługi Service Bus. Format przestrzeni nazw to: .servicebus.windows.net.

credential
TokenCredential lub AzureSasCredential lub AzureNamedKeyCredential
Wymagane

Obiekt poświadczeń używany do uwierzytelniania, który implementuje określony interfejs do pobierania tokenów. Akceptuje obiekty poświadczeń generowane przez bibliotekę tożsamości platformy Azure i obiekty, które implementują metodę *get_token(self, scopes) lub alternatywnie można podać klasę AzureSasCredential.

queue_name
str

Ścieżka określonej kolejki usługi Service Bus, z którą nawiązuje połączenie klient.

topic_name
str

Ścieżka określonego tematu usługi Service Bus, który zawiera subskrypcję, z którą nawiązuje połączenie klient.

subscription_name
str

Ścieżka określonej subskrypcji usługi Service Bus w określonym temacie, z którymi nawiązuje połączenie klient.

max_wait_time
Optional[float]

Limit czasu w sekundach między odebranych komunikatów, po którym odbiorca automatycznie przestanie odbierać. Wartość domyślna to Brak, co oznacza brak limitu czasu.

receive_mode
Union[ServiceBusReceiveMode, str]

Tryb pobierania komunikatów z jednostki. Dwie opcje są PEEK_LOCK i RECEIVE_AND_DELETE. Komunikaty odebrane za pomocą PEEK_LOCK muszą zostać rozliczone w danym okresie blokady, zanim zostaną usunięte z kolejki. Komunikaty odebrane za pomocą RECEIVE_AND_DELETE zostaną natychmiast usunięte z kolejki i nie mogą zostać następnie porzucone lub ponownie odebrane, jeśli klient nie przetworzy komunikatu. Tryb domyślny to PEEK_LOCK.

logging_enable
bool

Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.

transport_type
TransportType

Typ protokołu transportowego, który będzie używany do komunikacji z usługą Service Bus. Wartość domyślna to TransportType.Amqp.

http_proxy
Dict

Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".

user_agent
str

Jeśli zostanie określony, zostanie to dodane przed wbudowanym ciągiem agenta użytkownika.

auto_lock_renewer
Optional[AutoLockRenewer]

Można podać element ~azure.servicebus.AutoLockRenewer, tak aby komunikaty były automatycznie rejestrowane po otrzymaniu. Jeśli odbiornik jest odbiornikiem sesji, będzie on zamiast tego stosowany do sesji.

prefetch_count
int

Maksymalna liczba komunikatów do buforowania przy użyciu każdego żądania do usługi. To ustawienie jest przeznaczone tylko do zaawansowanego dostrajania wydajności. Zwiększenie tej wartości poprawi wydajność przepływności komunikatów, ale zwiększy prawdopodobieństwo wygaśnięcia komunikatów podczas ich buforowania, jeśli nie są one wystarczająco szybko przetwarzane. Wartość domyślna to 0, co oznacza, że komunikaty będą odbierane z usługi i przetwarzane pojedynczo. W przypadku prefetch_count 0 usługa ServiceBusReceiver.receive spróbuje buforować max_message_count (jeśli podano) w żądaniu usługi.

client_identifier
str

Identyfikator oparty na ciągu w celu unikatowego zidentyfikowania wystąpienia klienta. Usługa Service Bus skojarzy ją z niektórymi komunikatami o błędach, aby ułatwić korelację błędów. Jeśli nie zostanie określony, zostanie wygenerowany unikatowy identyfikator.

socket_timeout
float

Czas w sekundach, w którym bazowe gniazdo w połączeniu powinno czekać podczas wysyłania i odbierania danych przed przekroczeniem limitu czasu. Wartość domyślna to 0,2 dla elementu TransportType.Amqp i 1 dla elementu TransportType.AmqpOverWebsocket. Jeśli występują błędy połączenia z powodu limitu czasu zapisu, może być konieczne przekazanie większej niż wartość domyślna.

Zmienne

fully_qualified_namespace
str

W pełni kwalifikowana nazwa hosta dla przestrzeni nazw usługi Service Bus. Format przestrzeni nazw to: .servicebus.windows.net.

entity_path
str

Ścieżka jednostki, z którą nawiązuje połączenie klient.

Metody

abandon_message

Porzucanie wiadomości.

Ten komunikat zostanie zwrócony do kolejki i udostępniony do ponownego odebrania.

close
complete_message

Ukończ komunikat.

Spowoduje to usunięcie komunikatu z kolejki.

dead_letter_message

Przenieś komunikat do kolejki Dead Letter.

Kolejka Dead Letter to kolejka podrzędna, która może służyć do przechowywania komunikatów, które nie powiodły się poprawnie, lub w inny sposób wymagają dalszej inspekcji lub przetwarzania. Kolejkę można również skonfigurować do wysyłania wygasłych komunikatów do kolejki Utracony list.

defer_message

Wyzywając komunikat.

Ten komunikat pozostanie w kolejce, ale musi być żądany specjalnie przez jego numer sekwencji w celu odebrania.

next
peek_messages

Przeglądaj komunikaty oczekujące obecnie w kolejce.

Podgląd komunikatów nie są usuwane z kolejki ani nie są zablokowane. Nie można ich ukończyć, odroczyć ani odroczyć.

receive_deferred_messages

Odbieraj komunikaty, które zostały wcześniej odroczone.

Podczas odbierania komunikatów odroczonych z jednostki partycjonowanej wszystkie podane numery sekwencji muszą być komunikatami z tej samej partycji.

receive_messages

Odbieraj partię komunikatów jednocześnie.

Takie podejście jest optymalne, jeśli chcesz przetwarzać wiele komunikatów jednocześnie lub wykonywać odbieranie ad hoc jako pojedyncze wywołanie.

Należy pamiętać, że liczba komunikatów pobranych w jednej partii będzie zależna od tego , czy prefetch_count została ustawiona dla odbiornika. Jeśli prefetch_count nie jest ustawiona dla odbiornika, odbiornik spróbuje buforować komunikaty max_message_count (jeśli podano) w żądaniu do usługi.

To wywołanie będzie określać priorytet zwracania szybko przez spotkanie określonego rozmiaru partii, a więc zostanie zwrócony zaraz po odebraniu co najmniej jednego komunikatu i istnieje luka w komunikatach przychodzących niezależnie od określonego rozmiaru partii.

renew_message_lock

Odnów blokadę komunikatu.

Spowoduje to utrzymanie blokady komunikatu, aby upewnić się, że nie jest zwracany do kolejki do ponownego przetwarzania.

Aby zakończyć (lub w inny sposób rozwiązać) komunikat, blokada musi zostać zachowana i nie może już wygasła; Nie można odnowić wygasłej blokady.

Komunikaty odbierane za pośrednictwem trybu RECEIVE_AND_DELETE nie są zablokowane i dlatego nie można ich odnowić. Ta operacja jest dostępna tylko dla komunikatów innych niż sesje.

abandon_message

Porzucanie wiadomości.

Ten komunikat zostanie zwrócony do kolejki i udostępniony do ponownego odebrania.

abandon_message(message: ServiceBusReceivedMessage) -> None

Parametry

message
ServiceBusReceivedMessage
Wymagane

Odebrana wiadomość ma zostać porzucona.

Typ zwracany

Wyjątki

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Przykłady

Porzucanie odebranej wiadomości.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.abandon_message(message)

close

close() -> None

Wyjątki

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

Ukończ komunikat.

Spowoduje to usunięcie komunikatu z kolejki.

complete_message(message: ServiceBusReceivedMessage) -> None

Parametry

message
ServiceBusReceivedMessage
Wymagane

Odebrana wiadomość ma zostać ukończona.

Typ zwracany

Wyjątki

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Przykłady

Ukończ odebraną wiadomość.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.complete_message(message)

dead_letter_message

Przenieś komunikat do kolejki Dead Letter.

Kolejka Dead Letter to kolejka podrzędna, która może służyć do przechowywania komunikatów, które nie powiodły się poprawnie, lub w inny sposób wymagają dalszej inspekcji lub przetwarzania. Kolejkę można również skonfigurować do wysyłania wygasłych komunikatów do kolejki Utracony list.

dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

Parametry

message
ServiceBusReceivedMessage
Wymagane

Odebrana wiadomość została wysłana na martwych listach.

reason
Optional[str]
wartość domyślna: None

Przyczyna zakleszczenia wiadomości.

error_description
Optional[str]
wartość domyślna: None

Szczegółowy opis błędu dotyczący utraconych komunikatów.

Typ zwracany

Wyjątki

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Przykłady

Martwych list odebranych wiadomości.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.dead_letter_message(message)

defer_message

Wyzywając komunikat.

Ten komunikat pozostanie w kolejce, ale musi być żądany specjalnie przez jego numer sekwencji w celu odebrania.

defer_message(message: ServiceBusReceivedMessage) -> None

Parametry

message
ServiceBusReceivedMessage
Wymagane

Odebrana wiadomość ma zostać odroczona.

Typ zwracany

Wyjątki

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Przykłady

Odroczenie odebranej wiadomości.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.defer_message(message)

next

next()

Wyjątki

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

peek_messages

Przeglądaj komunikaty oczekujące obecnie w kolejce.

Podgląd komunikatów nie są usuwane z kolejki ani nie są zablokowane. Nie można ich ukończyć, odroczyć ani odroczyć.

peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parametry

max_message_count
int
wartość domyślna: 1

Maksymalna liczba komunikatów do wypróbowania i podglądu. Wartość domyślna to 1.

sequence_number
int

Numer sekwencji komunikatów, z którego chcesz rozpocząć przeglądanie komunikatów.

timeout
Optional[float]

Łączny limit czasu operacji w sekundach, w tym wszystkie ponawianie prób. Wartość musi być większa niż 0, jeśli zostanie określona. Wartość domyślna to Brak, co oznacza brak limitu czasu.

Zwraca

Lista ~azure.servicebus.ServiceBusReceivedMessage.

Typ zwracany

Wyjątki

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Przykłady

Przyjrzyj się oczekującym komunikatom w kolejce.


   with servicebus_receiver:
       messages = servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

Odbieraj komunikaty, które zostały wcześniej odroczone.

Podczas odbierania komunikatów odroczonych z jednostki partycjonowanej wszystkie podane numery sekwencji muszą być komunikatami z tej samej partycji.

receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

Parametry

sequence_numbers
Union[int,List[int]]
Wymagane

Lista sekwencji numerów komunikatów, które zostały odroczone.

timeout
Optional[float]

Łączny limit czasu operacji w sekundach, w tym wszystkie ponawianie prób. Wartość musi być większa niż 0, jeśli zostanie określona. Wartość domyślna to Brak, co oznacza brak limitu czasu.

Zwraca

Lista żądanych wystąpień ~azure.servicebus.ServiceBusReceivedMessage.

Typ zwracany

Wyjątki

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Przykłady

Odbieranie odroczonych komunikatów z usługi ServiceBus.


   with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           servicebus_receiver.defer_message(message)

       received_deferred_msg = servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for msg in received_deferred_msg:
           servicebus_receiver.complete_message(msg)

receive_messages

Odbieraj partię komunikatów jednocześnie.

Takie podejście jest optymalne, jeśli chcesz przetwarzać wiele komunikatów jednocześnie lub wykonywać odbieranie ad hoc jako pojedyncze wywołanie.

Należy pamiętać, że liczba komunikatów pobranych w jednej partii będzie zależna od tego , czy prefetch_count została ustawiona dla odbiornika. Jeśli prefetch_count nie jest ustawiona dla odbiornika, odbiornik spróbuje buforować komunikaty max_message_count (jeśli podano) w żądaniu do usługi.

To wywołanie będzie określać priorytet zwracania szybko przez spotkanie określonego rozmiaru partii, a więc zostanie zwrócony zaraz po odebraniu co najmniej jednego komunikatu i istnieje luka w komunikatach przychodzących niezależnie od określonego rozmiaru partii.

receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

Parametry

max_message_count
Optional[int]
wartość domyślna: 1

Maksymalna liczba komunikatów w partii. Zwracana liczba rzeczywista będzie zależeć od prefetch_count i szybkości przychodzącego strumienia. Ustawienie na None będzie w pełni zależeć od konfiguracji przed pobraniem. Wartość domyślna to 1.

max_wait_time
Optional[float]
wartość domyślna: None

Maksymalny czas oczekiwania w sekundach na nadejście pierwszej wiadomości. Jeśli nie zostaną wysłane żadne komunikaty i nie zostanie określony limit czasu, to wywołanie nie zostanie zwrócone, dopóki połączenie nie zostanie zamknięte. Jeśli zostanie określony, w okresie przekroczenia limitu czasu nie zostanie zwrócona pusta lista.

Zwraca

Lista odebranych komunikatów. Jeśli żadne komunikaty nie są dostępne, będzie to pusta lista.

Typ zwracany

Wyjątki

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

Przykłady

Odbieranie komunikatów z usługi ServiceBus.


   with servicebus_receiver:
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           servicebus_receiver.complete_message(message)

renew_message_lock

Odnów blokadę komunikatu.

Spowoduje to utrzymanie blokady komunikatu, aby upewnić się, że nie jest zwracany do kolejki do ponownego przetwarzania.

Aby zakończyć (lub w inny sposób rozwiązać) komunikat, blokada musi zostać zachowana i nie może już wygasła; Nie można odnowić wygasłej blokady.

Komunikaty odbierane za pośrednictwem trybu RECEIVE_AND_DELETE nie są zablokowane i dlatego nie można ich odnowić. Ta operacja jest dostępna tylko dla komunikatów innych niż sesje.

renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

Parametry

message
ServiceBusReceivedMessage
Wymagane

Komunikat o odnowieniu blokady.

timeout
Optional[float]

Łączny limit czasu operacji w sekundach, w tym wszystkie ponawianie prób. Wartość musi być większa niż 0, jeśli zostanie określona. Wartość domyślna to Brak, co oznacza brak limitu czasu.

Zwraca

Data utc, w przypadku gdy blokada zostanie ustawiona na wygaśnięcie.

Typ zwracany

Wyjątki

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

Przykłady

Odnów blokadę odebranej wiadomości.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.renew_message_lock(message)

Atrybuty

client_identifier

Pobierz client_identifier ServiceBusReceiver skojarzone z wystąpieniem odbiorcy.

Typ zwracany

str

session

Pobierz obiekt ServiceBusSession połączony z odbiornikiem. Sesja jest dostępna tylko dla jednostek z włączoną sesją. Zwraca ona wartość Brak, jeśli jest wywoływana w odbiorniku nieseksesyjnym.

Typ zwracany

Przykłady

Pobieranie sesji z odbiornika


       with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session