Udostępnij za pośrednictwem


EventHubConsumerClient Klasa

Klasa EventHubConsumerClient definiuje interfejs wysokiego poziomu do odbierania zdarzeń z usługi Azure Event Hubs.

Głównym celem klasy EventHubConsumerClient jest odbieranie zdarzeń ze wszystkich partycji usługi EventHub z równoważeniem obciążenia i tworzeniem punktów kontrolnych.

Jeśli wiele wystąpień EventHubConsumerClient jest uruchomionych względem tego samego centrum zdarzeń, grupy odbiorców i lokalizacji tworzenia punktów kontrolnych, partycje będą równomiernie dystrybuowane między nimi.

Aby włączyć równoważenie obciążenia i utrwalone punkty kontrolne, checkpoint_store należy ustawić podczas tworzenia klasy EventHubConsumerClient. Jeśli nie podano magazynu punktów kontrolnych, punkt kontrolny zostanie zachowany wewnętrznie w pamięci.

Obiekt EventHubConsumerClient może również odbierać z określonej partycji po wywołaniu metody receive() lub receive_batch() i określić partition_id. Równoważenie obciążenia nie będzie działać w trybie pojedynczej partycji. Jednak użytkownicy nadal mogą zapisywać punkty kontrolne, jeśli ustawiono checkpoint_store.

Dziedziczenie
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Konstruktor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parametry

fully_qualified_namespace
str
Wymagane

W pełni kwalifikowana nazwa hosta dla przestrzeni nazw usługi Event Hubs. Format przestrzeni nazw to: .servicebus.windows.net.

eventhub_name
str
Wymagane

Ścieżka określonego centrum zdarzeń do połączenia klienta.

consumer_group
str
Wymagane

Odbieranie zdarzeń z centrum zdarzeń dla tej grupy odbiorców.

credential
AsyncTokenCredential lub AzureSasCredential lub AzureNamedKeyCredential
Wymagane

Obiekt poświadczeń używany do uwierzytelniania, który implementuje określony interfejs do pobierania tokenów. Akceptuje EventHubSharedKeyCredentialobiekty , lub poświadczenia generowane przez bibliotekę tożsamości platformy Azure i obiekty implementujące metodę *get_token(self, scopes).

logging_enable
bool

Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.

auth_timeout
float

Czas w sekundach oczekiwania na autoryzację tokenu przez usługę. Wartość domyślna to 60 sekund. Jeśli ustawiono wartość 0, nie zostanie wymuszony limit czasu od klienta.

user_agent
str

Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.

retry_total
int

Całkowita liczba prób ponownego wykonania operacji, która zakończyła się niepowodzeniem po wystąpieniu błędu. Wartość domyślna to 3. Kontekst retry_total odbierania jest specjalny: metoda odbierania jest implementowana przez pętlę czasową wywołującą wewnętrzną metodę odbierania w każdej iteracji. W przypadku odbieraniaretry_total określa liczbę ponownych prób po wystąpieniu błędu zgłoszonego przez wewnętrzną metodę odbierania w pętli czasowej. Jeśli próby ponawiania prób zostaną wyczerpane, wywołanie zwrotne on_error zostanie wywołane (jeśli podano) z informacjami o błędzie. Nieudany wewnętrzny odbiorca partycji zostanie zamknięty (on_partition_close zostanie wywołany, jeśli zostanie podany), a nowy wewnętrzny odbiorca partycji zostanie utworzony (on_partition_initialize zostanie wywołany, jeśli zostanie podany), aby wznowić odbieranie.

retry_backoff_factor
float

Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnienia). W trybie stałym zasady ponawiania będą zawsze w stanie uśpienia dla elementu {backoff factor}. W trybie wykładniczym zasady ponawiania prób uśpią następujące elementy: {backoff factor} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.

retry_backoff_max
float

Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).

retry_mode
str

Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".

idle_timeout
float

Przekroczenie limitu czasu w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie ma dalszych działań. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.

transport_type
TransportType

Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.

http_proxy

Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int).

checkpoint_store
Optional[CheckpointStore]

Menedżer, który przechowuje dane modułu równoważenia obciążenia partycji i punktu kontrolnego podczas odbierania zdarzeń. Magazyn punktów kontrolnych będzie używany w obu przypadkach odbierania ze wszystkich partycji lub jednej partycji. W drugim przypadku równoważenie obciążenia nie ma zastosowania. Jeśli nie podano magazynu punktów kontrolnych, punkt kontrolny będzie utrzymywany wewnętrznie w pamięci, a wystąpienie EventHubConsumerClient będzie odbierać zdarzenia bez równoważenia obciążenia.

load_balancing_interval
float

Po rozpoczęciu równoważenia obciążenia. Jest to interwał w sekundach między dwoma ocenami równoważenia obciążenia. Wartość domyślna to 30 sekund.

partition_ownership_expiration_interval
float

Własność partycji wygaśnie po tej liczbie sekund. Każda ocena równoważenia obciążenia automatycznie wydłuży czas wygaśnięcia własności. Wartość domyślna to 6 * load_balancing_interval, czyli 180 sekund podczas korzystania z domyślnej load_balancing_interval 30 sekund.

load_balancing_strategy
str lub LoadBalancingStrategy

Po rozpoczęciu równoważenia obciążenia ta strategia będzie używana do oświadczeń i równoważenia własności partycji. Użyj "chciwości" lub LoadBalancingStrategy.GREEDY dla strategii chciwości, która dla każdej oceny równoważenia obciążenia będzie pobierać tyle nie odzyskanych partycji wymaganych do równoważenia obciążenia. Użyj wartości "zrównoważony" lub LoadBalancingStrategy.BALANCED dla strategii zrównoważonej, która w przypadku każdej oceny równoważenia obciążenia twierdzi tylko jedną partycję, która nie jest zgłaszana przez inną usługę EventHubConsumerClient. Jeśli wszystkie partycje usługi EventHub są obsługiwane przez inne klasy EventHubConsumerClient , a ten klient twierdził zbyt mało partycji, ten klient ukradnie jedną partycję z innych klientów na potrzeby każdej oceny równoważenia obciążenia niezależnie od strategii równoważenia obciążenia. Strategia chciwości jest domyślnie używana.

custom_endpoint_address
Optional[str]

Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.

connection_verify
Optional[str]

Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.

uamqp_transport
bool

Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.

socket_timeout
float

Czas w sekundach, w którym bazowe gniazdo w połączeniu powinno czekać podczas wysyłania i odbierania danych przed przekroczeniem limitu czasu. Wartość domyślna to 0,2 dla elementu TransportType.Amqp i 1 dla elementu TransportType.AmqpOverWebsocket. Jeśli występują błędy EventHubsConnectionError z powodu limitu czasu zapisu, może być konieczne przekazanie większej niż wartość domyślna. Jest to przeznaczone dla zaawansowanych scenariuszy użycia i zwykle wartość domyślna powinna być wystarczająca.

Przykłady

Utwórz nowe wystąpienie klasy EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metody

close

Zatrzymaj pobieranie zdarzeń z centrum zdarzeń i zamknij bazowe połączenie usługi AMQP i linki.

from_connection_string

Utwórz element EventHubConsumerClient na podstawie parametry połączenia.

get_eventhub_properties

Pobierz właściwości centrum zdarzeń.

Klucze w zwracanym słowniku obejmują:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (lista[str])

get_partition_ids

Pobierz identyfikatory partycji centrum zdarzeń.

get_partition_properties

Pobierz właściwości określonej partycji.

Klucze w słowniku właściwości obejmują:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.

receive_batch

Odbieranie zdarzeń z partycji w partiach z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.

close

Zatrzymaj pobieranie zdarzeń z centrum zdarzeń i zamknij bazowe połączenie usługi AMQP i linki.

async close() -> None

Typ zwracany

Przykłady

Zamknij klienta.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

Utwórz element EventHubConsumerClient na podstawie parametry połączenia.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Parametry

conn_str
str
Wymagane

Parametry połączenia centrum zdarzeń.

consumer_group
str
Wymagane

Odbieranie zdarzeń z centrum zdarzeń dla tej grupy odbiorców.

eventhub_name
str

Ścieżka określonego centrum zdarzeń do połączenia klienta.

logging_enable
bool

Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.

http_proxy
dict

Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".

auth_timeout
float

Czas w sekundach oczekiwania na autoryzację tokenu przez usługę. Wartość domyślna to 60 sekund. Jeśli ustawiono wartość 0, nie zostanie wymuszony limit czasu od klienta.

user_agent
str

Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.

retry_total
int

Całkowita liczba prób ponownego wykonania operacji, która zakończyła się niepowodzeniem po wystąpieniu błędu. Wartość domyślna to 3. Kontekst retry_total odbierania jest specjalny: metoda odbierania jest implementowana przez pętlę czasową wywołującą wewnętrzną metodę odbierania w każdej iteracji. W przypadku odbieraniaretry_total określa liczbę ponownych prób po wystąpieniu błędu zgłoszonego przez wewnętrzną metodę odbierania w pętli czasowej. Jeśli próby ponawiania prób zostaną wyczerpane, wywołanie zwrotne on_error zostanie wywołane (jeśli podano) z informacjami o błędzie. Nieudany wewnętrzny odbiorca partycji zostanie zamknięty (on_partition_close zostanie wywołany, jeśli zostanie podany), a nowy wewnętrzny odbiorca partycji zostanie utworzony (on_partition_initialize zostanie wywołany, jeśli zostanie podany), aby wznowić odbieranie.

retry_backoff_factor
float

Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnienia). W trybie stałym zasady ponawiania będą zawsze w stanie uśpienia dla elementu {backoff factor}. W trybie wykładniczym zasady ponawiania prób uśpią następujące elementy: {backoff factor} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.

retry_backoff_max
float

Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).

retry_mode
str

Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".

idle_timeout
float

Przekroczenie limitu czasu w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie ma dalszych działań. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.

transport_type
TransportType

Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.

checkpoint_store
Optional[CheckpointStore]

Menedżer, który przechowuje dane modułu równoważenia obciążenia partycji i punktu kontrolnego podczas odbierania zdarzeń. Magazyn punktów kontrolnych będzie używany w obu przypadkach odbierania ze wszystkich partycji lub jednej partycji. W drugim przypadku równoważenie obciążenia nie ma zastosowania. Jeśli nie podano magazynu punktów kontrolnych, punkt kontrolny będzie utrzymywany wewnętrznie w pamięci, a wystąpienie EventHubConsumerClient będzie odbierać zdarzenia bez równoważenia obciążenia.

load_balancing_interval
float

Po rozpoczęciu równoważenia obciążenia. Jest to interwał w sekundach między dwoma ocenami równoważenia obciążenia. Wartość domyślna to 30 sekund.

partition_ownership_expiration_interval
float

Własność partycji wygaśnie po tej liczbie sekund. Każda ocena równoważenia obciążenia automatycznie wydłuży czas wygaśnięcia własności. Wartość domyślna to 6 * load_balancing_interval, czyli 180 sekund podczas korzystania z domyślnej load_balancing_interval 30 sekund.

load_balancing_strategy
str lub LoadBalancingStrategy

Po rozpoczęciu równoważenia obciążenia ta strategia będzie używana do oświadczeń i równoważenia własności partycji. Użyj "chciwości" lub LoadBalancingStrategy.GREEDY dla strategii chciwości, która dla każdej oceny równoważenia obciążenia będzie pobierać tyle nie odzyskanych partycji wymaganych do równoważenia obciążenia. Użyj wartości "zrównoważony" lub LoadBalancingStrategy.BALANCED dla strategii zrównoważonej, która w przypadku każdej oceny równoważenia obciążenia twierdzi tylko jedną partycję, która nie jest zgłaszana przez inną usługę EventHubConsumerClient. Jeśli wszystkie partycje usługi EventHub są obsługiwane przez inne klasy EventHubConsumerClient , a ten klient twierdził zbyt mało partycji, ten klient ukradnie jedną partycję z innych klientów na potrzeby każdej oceny równoważenia obciążenia niezależnie od strategii równoważenia obciążenia. Strategia chciwości jest domyślnie używana.

custom_endpoint_address
Optional[str]

Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.

connection_verify
Optional[str]

Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.

uamqp_transport
bool

Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.

Typ zwracany

Przykłady

Utwórz nowe wystąpienie klasy EventHubConsumerClient z parametry połączenia.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Pobierz właściwości centrum zdarzeń.

Klucze w zwracanym słowniku obejmują:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (lista[str])

async get_eventhub_properties() -> Dict[str, Any]

Zwraca

Słownik zawierający informacje o centrum zdarzeń.

Typ zwracany

Wyjątki

get_partition_ids

Pobierz identyfikatory partycji centrum zdarzeń.

async get_partition_ids() -> List[str]

Zwraca

Lista identyfikatorów partycji.

Typ zwracany

Wyjątki

get_partition_properties

Pobierz właściwości określonej partycji.

Klucze w słowniku właściwości obejmują:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametry

partition_id
str
Wymagane

Identyfikator partycji docelowej.

Zwraca

Słownik zawierający właściwości partycji.

Typ zwracany

Wyjątki

receive

Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parametry

on_event
Callable[PartitionContext, Optional[EventData]]
Wymagane

Funkcja wywołania zwrotnego do obsługi odebranego zdarzenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera kontekst partycji i zdarzenie , które jest odebrane zdarzenie. Funkcja wywołania zwrotnego powinna być zdefiniowana na przykład: on_event(partition_context, event). Aby uzyskać szczegółowe informacje o kontekście partycji, zobacz PartitionContext.

max_wait_time
float

Maksymalny interwał w sekundach oczekiwania procesora zdarzeń przed wywołaniem wywołania zwrotnego. Jeśli w tym interwale nie zostaną odebrane żadne zdarzenia, wywołanie zwrotne on_event zostanie wywołane z brakiem. Jeśli ta wartość jest ustawiona na Wartość Brak lub 0 (wartość domyślna), wywołanie zwrotne nie będzie wywoływane do momentu odebrania zdarzenia.

partition_id
str

Jeśli zostanie określony, klient otrzyma tylko z tej partycji. W przeciwnym razie klient otrzyma od wszystkich partycji.

owner_level
int

Priorytet dla wyłącznego konsumenta. W przypadku ustawienia owner_level zostanie utworzony wyłączny konsument. Konsument o wyższym owner_level ma wyższy wyłączny priorytet. Poziom właściciela jest również znany jako "wartość epoki" konsumenta.

prefetch
int

Liczba zdarzeń do wstępnego pobrania z usługi na potrzeby przetwarzania. Wartość domyślna to 300.

track_last_enqueued_event_properties
bool

Wskazuje, czy odbiorca powinien zażądać informacji o ostatnim zdarzeniu w kolejce na skojarzonej partycji i śledzić te informacje w miarę odbierania zdarzeń. Gdy są śledzone informacje o zdarzeniach w kolejce partycji, każde zdarzenie odebrane z usługi Event Hubs będzie zawierać metadane dotyczące partycji. Powoduje to niewielkie użycie dodatkowej przepustowości sieci, które jest zazwyczaj korzystnym kompromisem w przypadku okresowego podejmowania żądań dotyczących właściwości partycji przy użyciu klienta centrum zdarzeń. Jest ona domyślnie ustawiona na wartość Fałsz .

starting_position
str, int, datetime lub dict[str,any]

Rozpocznij odbieranie z tej pozycji zdarzenia, jeśli nie ma danych punktu kontrolnego dla partycji. Dane punktu kontrolnego będą używane, jeśli są dostępne. Może to być dykt z identyfikatorem partycji jako kluczem i pozycją jako wartością poszczególnych partycji lub pojedynczą wartością dla wszystkich partycji. Typ wartości może być str, int lub datetime.datetime. Obsługiwane są również wartości "-1" do odbierania od początku strumienia i "@latest" do odbierania tylko nowych zdarzeń.

starting_position_inclusive
bool lub dict[str,bool]

Ustal, czy dana starting_position jest inkluzywna(>=) czy nie (>). Prawda dla inkluzywnego i fałszu dla wyłącznych. Może to być dykt z identyfikatorem partycji jako kluczem i wartością logiczną wskazującą, czy starting_position dla określonej partycji jest włącznie, czy nie. Może to być również pojedyncza wartość logiczna dla wszystkich starting_position. Wartość domyślna to False.

on_error
Callable[[PartitionContext, Exception]]

Funkcja wywołania zwrotnego, która zostanie wywołana w przypadku wystąpienia błędu podczas odbierania po ponowieniu prób, zostanie wyczerpana lub podczas procesu równoważenia obciążenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i błąd jest wyjątkiem. partition_context może być brakiem, jeśli błąd jest zgłaszany podczas procesu równoważenia obciążenia. Wywołanie zwrotne powinno być zdefiniowane tak, jak: on_error(partition_context, błąd). Wywołanie zwrotne on_error będzie również wywoływane, jeśli podczas wywołania zwrotnego on_event zostanie zgłoszony nieobsługiwany wyjątek.

on_partition_initialize
Callable[[PartitionContext]]

Funkcja wywołania zwrotnego, która będzie wywoływana po odbiorcy dla określonej partycji, kończy inicjowanie. Zostanie również wywołana, gdy zostanie utworzony nowy wewnętrzny odbiorca partycji, aby przejąć proces odbierania dla użytkownika partycji wewnętrznej i zakończonego niepowodzeniem. Wywołanie zwrotne przyjmuje jeden parametr: partition_context który zawiera informacje o partycji. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Funkcja wywołania zwrotnego, która zostanie wywołana po zamknięciu odbiorcy dla określonej partycji. Zostanie również wywołana w przypadku wystąpienia błędu podczas odbierania po wyczerpaniu prób ponawiania prób. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i przyczynę zamknięcia. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_close(partition_context, przyczyna). Zapoznaj się z CloseReason różnymi przyczynami zamknięcia.

Typ zwracany

Przykłady

Odbieranie zdarzeń z usługi EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Odbieranie zdarzeń z partycji w partiach z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parametry

on_event_batch
Callable[PartitionContext, List[EventData]]
Wymagane

Funkcja wywołania zwrotnego do obsługi partii odebranych zdarzeń. Wywołanie zwrotne przyjmuje dwa parametry: partition_context który zawiera kontekst partycji i event_batch, czyli odebrane zdarzenia. Funkcja wywołania zwrotnego powinna być zdefiniowana na przykład: on_event_batch(partition_context, event_batch). event_batch może być pustą listą, jeśli max_wait_time nie ma wartości None ani 0 i żadne zdarzenie nie zostanie odebrane po max_wait_time. Aby uzyskać szczegółowe informacje o kontekście partycji, zapoznaj się z tematem PartitionContext.

max_batch_size
int

Maksymalna liczba zdarzeń w partii przekazanych do wywołania zwrotnego on_event_batch. Jeśli rzeczywista liczba odebranych zdarzeń jest większa niż max_batch_size, odebrane zdarzenia są podzielone na partie i wywołania zwrotne dla każdej partii z maksymalnie max_batch_size zdarzeniami.

max_wait_time
float

Maksymalny interwał w sekundach oczekiwania procesora zdarzeń przed wywołaniem wywołania zwrotnego. Jeśli w tym interwale nie zostaną odebrane żadne zdarzenia, wywołanie zwrotne on_event_batch zostanie wywołane z pustą listą. Jeśli ta wartość jest ustawiona na Wartość Brak lub 0 (wartość domyślna), wywołanie zwrotne nie będzie wywoływane do momentu odebrania zdarzeń.

partition_id
str

Jeśli zostanie określony, klient otrzyma tylko z tej partycji. W przeciwnym razie klient otrzyma z wszystkich partycji.

owner_level
int

Priorytet dla wyłącznego konsumenta. W przypadku ustawienia owner_level zostanie utworzony wyłączny konsument. Użytkownik o wyższym owner_level ma wyższy priorytet wyłączny. Poziom właściciela jest również znany jako "wartość epoki" konsumenta.

prefetch
int

Liczba zdarzeń do wstępnego pobrania z usługi na potrzeby przetwarzania. Wartość domyślna to 300.

track_last_enqueued_event_properties
bool

Wskazuje, czy odbiorca powinien zażądać informacji o ostatnim zdarzeniu w kolejce na skojarzonej partycji i śledzić te informacje w miarę odbierania zdarzeń. Gdy są śledzone informacje o zdarzeniach ostatnio w kolejce partycji, każde zdarzenie odebrane z usługi Event Hubs będzie zawierać metadane dotyczące partycji. Powoduje to niewielkie użycie dodatkowej przepustowości sieci, które jest zazwyczaj korzystnym kompromisem w przypadku okresowego wysyłania żądań dotyczących właściwości partycji przy użyciu klienta centrum zdarzeń. Domyślnie jest ustawiona wartość Fałsz .

starting_position
str, int, datetime lub dict[str,any]

Rozpocznij odbieranie z tej pozycji zdarzenia, jeśli nie ma danych punktu kontrolnego dla partycji. Dane punktu kontrolnego będą używane, jeśli są dostępne. Może to być dykt z identyfikatorem partycji jako kluczem i pozycją jako wartością poszczególnych partycji lub pojedynczą wartością dla wszystkich partycji. Typ wartości może być typu str, int lub datetime.datetime. Obsługiwane są również wartości "-1" do odbierania od początku strumienia i "@latest" do odbierania tylko nowych zdarzeń.

starting_position_inclusive
bool lub dict[str,bool]

Ustal, czy dana starting_position jest inkluzywna(>=) czy nie (>). True dla inclusive i False dla wyłączność. Może to być dykt z identyfikatorem partycji jako kluczem i wartością logiczną wskazującą, czy starting_position dla określonej partycji jest inkluzywna, czy nie. Może to być również pojedyncza wartość logiczna dla wszystkich starting_position. Wartość domyślna to False.

on_error
Callable[[PartitionContext, Exception]]

Funkcja wywołania zwrotnego, która zostanie wywołana, gdy podczas odbierania po ponowieniu prób zostanie zgłoszony błąd lub podczas procesu równoważenia obciążenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context zawierające informacje o partycji i błąd będący wyjątkiem. partition_context może mieć wartość Brak, jeśli błąd jest zgłaszany podczas procesu równoważenia obciążenia. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_error(partition_context, błąd). Wywołanie zwrotne on_error będzie również wywoływane, jeśli podczas wywołania zwrotnego on_event zostanie zgłoszony nieobsługiwany wyjątek.

on_partition_initialize
Callable[[PartitionContext]]

Funkcja wywołania zwrotnego, która zostanie wywołana po odbiorcy dla określonej partycji, zakończy inicjowanie. Zostanie on również wywołany, gdy zostanie utworzony nowy odbiorca partycji wewnętrznej, aby przejąć proces odbierania dla użytkownika partycji wewnętrznej zakończonego niepowodzeniem i zamkniętego użytkownika partycji wewnętrznej. Wywołanie zwrotne przyjmuje jeden parametr: partition_context który zawiera informacje o partycji. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Funkcja wywołania zwrotnego, która zostanie wywołana po zamknięciu odbiorcy dla określonej partycji. Jest to również wywoływane, gdy podczas odbierania błędu po zakończeniu ponownych prób zostanie zgłoszony błąd. Wywołanie zwrotne przyjmuje dwa parametry: partition_context który zawiera informacje o partycji i przyczynę zamknięcia. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_close(partition_context, reason). Zapoznaj się z CloseReason różnymi przyczynami zamknięcia.

Typ zwracany

Przykłady

Odbieranie zdarzeń w partiach z usługi EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )