EventHubConsumerClient Klasa
Klasa EventHubConsumerClient definiuje interfejs wysokiego poziomu do odbierania zdarzeń z usługi Azure Event Hubs.
Głównym celem klasy EventHubConsumerClient jest odbieranie zdarzeń ze wszystkich partycji usługi EventHub z równoważeniem obciążenia i tworzeniem punktów kontrolnych.
Jeśli wiele wystąpień EventHubConsumerClient jest uruchomionych względem tego samego centrum zdarzeń, grupy odbiorców i lokalizacji tworzenia punktów kontrolnych, partycje będą równomiernie dystrybuowane między nimi.
Aby włączyć równoważenie obciążenia i utrwalone punkty kontrolne, checkpoint_store należy ustawić podczas tworzenia klasy EventHubConsumerClient. Jeśli nie podano magazynu punktów kontrolnych, punkt kontrolny zostanie zachowany wewnętrznie w pamięci.
Obiekt EventHubConsumerClient może również odbierać z określonej partycji po wywołaniu metody receive() lub receive_batch() i określić partition_id. Równoważenie obciążenia nie będzie działać w trybie pojedynczej partycji. Jednak użytkownicy nadal mogą zapisywać punkty kontrolne, jeśli ustawiono checkpoint_store.
- Dziedziczenie
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
Konstruktor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parametry
- fully_qualified_namespace
- str
W pełni kwalifikowana nazwa hosta dla przestrzeni nazw usługi Event Hubs. Format przestrzeni nazw to: .servicebus.windows.net.
- credential
- AsyncTokenCredential lub AzureSasCredential lub AzureNamedKeyCredential
Obiekt poświadczeń używany do uwierzytelniania, który implementuje określony interfejs do pobierania tokenów. Akceptuje EventHubSharedKeyCredentialobiekty , lub poświadczenia generowane przez bibliotekę tożsamości platformy Azure i obiekty implementujące metodę *get_token(self, scopes).
- logging_enable
- bool
Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.
- auth_timeout
- float
Czas w sekundach oczekiwania na autoryzację tokenu przez usługę. Wartość domyślna to 60 sekund. Jeśli ustawiono wartość 0, nie zostanie wymuszony limit czasu od klienta.
- user_agent
- str
Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.
- retry_total
- int
Całkowita liczba prób ponownego wykonania operacji, która zakończyła się niepowodzeniem po wystąpieniu błędu. Wartość domyślna to 3. Kontekst retry_total odbierania jest specjalny: metoda odbierania jest implementowana przez pętlę czasową wywołującą wewnętrzną metodę odbierania w każdej iteracji. W przypadku odbieraniaretry_total określa liczbę ponownych prób po wystąpieniu błędu zgłoszonego przez wewnętrzną metodę odbierania w pętli czasowej. Jeśli próby ponawiania prób zostaną wyczerpane, wywołanie zwrotne on_error zostanie wywołane (jeśli podano) z informacjami o błędzie. Nieudany wewnętrzny odbiorca partycji zostanie zamknięty (on_partition_close zostanie wywołany, jeśli zostanie podany), a nowy wewnętrzny odbiorca partycji zostanie utworzony (on_partition_initialize zostanie wywołany, jeśli zostanie podany), aby wznowić odbieranie.
- retry_backoff_factor
- float
Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnienia). W trybie stałym zasady ponawiania będą zawsze w stanie uśpienia dla elementu {backoff factor}. W trybie wykładniczym zasady ponawiania prób uśpią następujące elementy: {backoff factor} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.
- retry_backoff_max
- float
Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).
- retry_mode
- str
Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".
- idle_timeout
- float
Przekroczenie limitu czasu w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie ma dalszych działań. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.
- transport_type
- TransportType
Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.
- http_proxy
Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int).
- checkpoint_store
- Optional[CheckpointStore]
Menedżer, który przechowuje dane modułu równoważenia obciążenia partycji i punktu kontrolnego podczas odbierania zdarzeń. Magazyn punktów kontrolnych będzie używany w obu przypadkach odbierania ze wszystkich partycji lub jednej partycji. W drugim przypadku równoważenie obciążenia nie ma zastosowania. Jeśli nie podano magazynu punktów kontrolnych, punkt kontrolny będzie utrzymywany wewnętrznie w pamięci, a wystąpienie EventHubConsumerClient będzie odbierać zdarzenia bez równoważenia obciążenia.
- load_balancing_interval
- float
Po rozpoczęciu równoważenia obciążenia. Jest to interwał w sekundach między dwoma ocenami równoważenia obciążenia. Wartość domyślna to 30 sekund.
- partition_ownership_expiration_interval
- float
Własność partycji wygaśnie po tej liczbie sekund. Każda ocena równoważenia obciążenia automatycznie wydłuży czas wygaśnięcia własności. Wartość domyślna to 6 * load_balancing_interval, czyli 180 sekund podczas korzystania z domyślnej load_balancing_interval 30 sekund.
- load_balancing_strategy
- str lub LoadBalancingStrategy
Po rozpoczęciu równoważenia obciążenia ta strategia będzie używana do oświadczeń i równoważenia własności partycji. Użyj "chciwości" lub LoadBalancingStrategy.GREEDY dla strategii chciwości, która dla każdej oceny równoważenia obciążenia będzie pobierać tyle nie odzyskanych partycji wymaganych do równoważenia obciążenia. Użyj wartości "zrównoważony" lub LoadBalancingStrategy.BALANCED dla strategii zrównoważonej, która w przypadku każdej oceny równoważenia obciążenia twierdzi tylko jedną partycję, która nie jest zgłaszana przez inną usługę EventHubConsumerClient. Jeśli wszystkie partycje usługi EventHub są obsługiwane przez inne klasy EventHubConsumerClient , a ten klient twierdził zbyt mało partycji, ten klient ukradnie jedną partycję z innych klientów na potrzeby każdej oceny równoważenia obciążenia niezależnie od strategii równoważenia obciążenia. Strategia chciwości jest domyślnie używana.
Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.
Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.
- uamqp_transport
- bool
Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.
- socket_timeout
- float
Czas w sekundach, w którym bazowe gniazdo w połączeniu powinno czekać podczas wysyłania i odbierania danych przed przekroczeniem limitu czasu. Wartość domyślna to 0,2 dla elementu TransportType.Amqp i 1 dla elementu TransportType.AmqpOverWebsocket. Jeśli występują błędy EventHubsConnectionError z powodu limitu czasu zapisu, może być konieczne przekazanie większej niż wartość domyślna. Jest to przeznaczone dla zaawansowanych scenariuszy użycia i zwykle wartość domyślna powinna być wystarczająca.
Przykłady
Utwórz nowe wystąpienie klasy EventHubConsumerClient.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Metody
close |
Zatrzymaj pobieranie zdarzeń z centrum zdarzeń i zamknij bazowe połączenie usługi AMQP i linki. |
from_connection_string |
Utwórz element EventHubConsumerClient na podstawie parametry połączenia. |
get_eventhub_properties |
Pobierz właściwości centrum zdarzeń. Klucze w zwracanym słowniku obejmują:
|
get_partition_ids |
Pobierz identyfikatory partycji centrum zdarzeń. |
get_partition_properties |
Pobierz właściwości określonej partycji. Klucze w słowniku właściwości obejmują:
|
receive |
Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi. |
receive_batch |
Odbieranie zdarzeń z partycji w partiach z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi. |
close
Zatrzymaj pobieranie zdarzeń z centrum zdarzeń i zamknij bazowe połączenie usługi AMQP i linki.
async close() -> None
Typ zwracany
Przykłady
Zamknij klienta.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
Utwórz element EventHubConsumerClient na podstawie parametry połączenia.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
Parametry
- eventhub_name
- str
Ścieżka określonego centrum zdarzeń do połączenia klienta.
- logging_enable
- bool
Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.
- http_proxy
- dict
Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".
- auth_timeout
- float
Czas w sekundach oczekiwania na autoryzację tokenu przez usługę. Wartość domyślna to 60 sekund. Jeśli ustawiono wartość 0, nie zostanie wymuszony limit czasu od klienta.
- user_agent
- str
Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.
- retry_total
- int
Całkowita liczba prób ponownego wykonania operacji, która zakończyła się niepowodzeniem po wystąpieniu błędu. Wartość domyślna to 3. Kontekst retry_total odbierania jest specjalny: metoda odbierania jest implementowana przez pętlę czasową wywołującą wewnętrzną metodę odbierania w każdej iteracji. W przypadku odbieraniaretry_total określa liczbę ponownych prób po wystąpieniu błędu zgłoszonego przez wewnętrzną metodę odbierania w pętli czasowej. Jeśli próby ponawiania prób zostaną wyczerpane, wywołanie zwrotne on_error zostanie wywołane (jeśli podano) z informacjami o błędzie. Nieudany wewnętrzny odbiorca partycji zostanie zamknięty (on_partition_close zostanie wywołany, jeśli zostanie podany), a nowy wewnętrzny odbiorca partycji zostanie utworzony (on_partition_initialize zostanie wywołany, jeśli zostanie podany), aby wznowić odbieranie.
- retry_backoff_factor
- float
Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnienia). W trybie stałym zasady ponawiania będą zawsze w stanie uśpienia dla elementu {backoff factor}. W trybie wykładniczym zasady ponawiania prób uśpią następujące elementy: {backoff factor} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.
- retry_backoff_max
- float
Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).
- retry_mode
- str
Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".
- idle_timeout
- float
Przekroczenie limitu czasu w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie ma dalszych działań. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.
- transport_type
- TransportType
Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.
- checkpoint_store
- Optional[CheckpointStore]
Menedżer, który przechowuje dane modułu równoważenia obciążenia partycji i punktu kontrolnego podczas odbierania zdarzeń. Magazyn punktów kontrolnych będzie używany w obu przypadkach odbierania ze wszystkich partycji lub jednej partycji. W drugim przypadku równoważenie obciążenia nie ma zastosowania. Jeśli nie podano magazynu punktów kontrolnych, punkt kontrolny będzie utrzymywany wewnętrznie w pamięci, a wystąpienie EventHubConsumerClient będzie odbierać zdarzenia bez równoważenia obciążenia.
- load_balancing_interval
- float
Po rozpoczęciu równoważenia obciążenia. Jest to interwał w sekundach między dwoma ocenami równoważenia obciążenia. Wartość domyślna to 30 sekund.
- partition_ownership_expiration_interval
- float
Własność partycji wygaśnie po tej liczbie sekund. Każda ocena równoważenia obciążenia automatycznie wydłuży czas wygaśnięcia własności. Wartość domyślna to 6 * load_balancing_interval, czyli 180 sekund podczas korzystania z domyślnej load_balancing_interval 30 sekund.
- load_balancing_strategy
- str lub LoadBalancingStrategy
Po rozpoczęciu równoważenia obciążenia ta strategia będzie używana do oświadczeń i równoważenia własności partycji. Użyj "chciwości" lub LoadBalancingStrategy.GREEDY dla strategii chciwości, która dla każdej oceny równoważenia obciążenia będzie pobierać tyle nie odzyskanych partycji wymaganych do równoważenia obciążenia. Użyj wartości "zrównoważony" lub LoadBalancingStrategy.BALANCED dla strategii zrównoważonej, która w przypadku każdej oceny równoważenia obciążenia twierdzi tylko jedną partycję, która nie jest zgłaszana przez inną usługę EventHubConsumerClient. Jeśli wszystkie partycje usługi EventHub są obsługiwane przez inne klasy EventHubConsumerClient , a ten klient twierdził zbyt mało partycji, ten klient ukradnie jedną partycję z innych klientów na potrzeby każdej oceny równoważenia obciążenia niezależnie od strategii równoważenia obciążenia. Strategia chciwości jest domyślnie używana.
Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.
Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.
- uamqp_transport
- bool
Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.
Typ zwracany
Przykłady
Utwórz nowe wystąpienie klasy EventHubConsumerClient z parametry połączenia.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Pobierz właściwości centrum zdarzeń.
Klucze w zwracanym słowniku obejmują:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (lista[str])
async get_eventhub_properties() -> Dict[str, Any]
Zwraca
Słownik zawierający informacje o centrum zdarzeń.
Typ zwracany
Wyjątki
get_partition_ids
Pobierz identyfikatory partycji centrum zdarzeń.
async get_partition_ids() -> List[str]
Zwraca
Lista identyfikatorów partycji.
Typ zwracany
Wyjątki
get_partition_properties
Pobierz właściwości określonej partycji.
Klucze w słowniku właściwości obejmują:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametry
Zwraca
Słownik zawierający właściwości partycji.
Typ zwracany
Wyjątki
receive
Odbieranie zdarzeń z partycji z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parametry
- on_event
- Callable[PartitionContext, Optional[EventData]]
Funkcja wywołania zwrotnego do obsługi odebranego zdarzenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera kontekst partycji i zdarzenie , które jest odebrane zdarzenie. Funkcja wywołania zwrotnego powinna być zdefiniowana na przykład: on_event(partition_context, event). Aby uzyskać szczegółowe informacje o kontekście partycji, zobacz PartitionContext.
- max_wait_time
- float
Maksymalny interwał w sekundach oczekiwania procesora zdarzeń przed wywołaniem wywołania zwrotnego. Jeśli w tym interwale nie zostaną odebrane żadne zdarzenia, wywołanie zwrotne on_event zostanie wywołane z brakiem. Jeśli ta wartość jest ustawiona na Wartość Brak lub 0 (wartość domyślna), wywołanie zwrotne nie będzie wywoływane do momentu odebrania zdarzenia.
- partition_id
- str
Jeśli zostanie określony, klient otrzyma tylko z tej partycji. W przeciwnym razie klient otrzyma od wszystkich partycji.
- owner_level
- int
Priorytet dla wyłącznego konsumenta. W przypadku ustawienia owner_level zostanie utworzony wyłączny konsument. Konsument o wyższym owner_level ma wyższy wyłączny priorytet. Poziom właściciela jest również znany jako "wartość epoki" konsumenta.
- prefetch
- int
Liczba zdarzeń do wstępnego pobrania z usługi na potrzeby przetwarzania. Wartość domyślna to 300.
- track_last_enqueued_event_properties
- bool
Wskazuje, czy odbiorca powinien zażądać informacji o ostatnim zdarzeniu w kolejce na skojarzonej partycji i śledzić te informacje w miarę odbierania zdarzeń. Gdy są śledzone informacje o zdarzeniach w kolejce partycji, każde zdarzenie odebrane z usługi Event Hubs będzie zawierać metadane dotyczące partycji. Powoduje to niewielkie użycie dodatkowej przepustowości sieci, które jest zazwyczaj korzystnym kompromisem w przypadku okresowego podejmowania żądań dotyczących właściwości partycji przy użyciu klienta centrum zdarzeń. Jest ona domyślnie ustawiona na wartość Fałsz .
Rozpocznij odbieranie z tej pozycji zdarzenia, jeśli nie ma danych punktu kontrolnego dla partycji. Dane punktu kontrolnego będą używane, jeśli są dostępne. Może to być dykt z identyfikatorem partycji jako kluczem i pozycją jako wartością poszczególnych partycji lub pojedynczą wartością dla wszystkich partycji. Typ wartości może być str, int lub datetime.datetime. Obsługiwane są również wartości "-1" do odbierania od początku strumienia i "@latest" do odbierania tylko nowych zdarzeń.
Ustal, czy dana starting_position jest inkluzywna(>=) czy nie (>). Prawda dla inkluzywnego i fałszu dla wyłącznych. Może to być dykt z identyfikatorem partycji jako kluczem i wartością logiczną wskazującą, czy starting_position dla określonej partycji jest włącznie, czy nie. Może to być również pojedyncza wartość logiczna dla wszystkich starting_position. Wartość domyślna to False.
- on_error
- Callable[[PartitionContext, Exception]]
Funkcja wywołania zwrotnego, która zostanie wywołana w przypadku wystąpienia błędu podczas odbierania po ponowieniu prób, zostanie wyczerpana lub podczas procesu równoważenia obciążenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i błąd jest wyjątkiem. partition_context może być brakiem, jeśli błąd jest zgłaszany podczas procesu równoważenia obciążenia. Wywołanie zwrotne powinno być zdefiniowane tak, jak: on_error(partition_context, błąd). Wywołanie zwrotne on_error będzie również wywoływane, jeśli podczas wywołania zwrotnego on_event zostanie zgłoszony nieobsługiwany wyjątek.
- on_partition_initialize
- Callable[[PartitionContext]]
Funkcja wywołania zwrotnego, która będzie wywoływana po odbiorcy dla określonej partycji, kończy inicjowanie. Zostanie również wywołana, gdy zostanie utworzony nowy wewnętrzny odbiorca partycji, aby przejąć proces odbierania dla użytkownika partycji wewnętrznej i zakończonego niepowodzeniem. Wywołanie zwrotne przyjmuje jeden parametr: partition_context który zawiera informacje o partycji. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Funkcja wywołania zwrotnego, która zostanie wywołana po zamknięciu odbiorcy dla określonej partycji. Zostanie również wywołana w przypadku wystąpienia błędu podczas odbierania po wyczerpaniu prób ponawiania prób. Wywołanie zwrotne przyjmuje dwa parametry: partition_context , który zawiera informacje o partycji i przyczynę zamknięcia. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_close(partition_context, przyczyna). Zapoznaj się z CloseReason różnymi przyczynami zamknięcia.
Typ zwracany
Przykłady
Odbieranie zdarzeń z usługi EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
Odbieranie zdarzeń z partycji w partiach z opcjonalnym równoważeniem obciążenia i punktami kontrolnymi.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parametry
- on_event_batch
- Callable[PartitionContext, List[EventData]]
Funkcja wywołania zwrotnego do obsługi partii odebranych zdarzeń. Wywołanie zwrotne przyjmuje dwa parametry: partition_context który zawiera kontekst partycji i event_batch, czyli odebrane zdarzenia. Funkcja wywołania zwrotnego powinna być zdefiniowana na przykład: on_event_batch(partition_context, event_batch). event_batch może być pustą listą, jeśli max_wait_time nie ma wartości None ani 0 i żadne zdarzenie nie zostanie odebrane po max_wait_time. Aby uzyskać szczegółowe informacje o kontekście partycji, zapoznaj się z tematem PartitionContext.
- max_batch_size
- int
Maksymalna liczba zdarzeń w partii przekazanych do wywołania zwrotnego on_event_batch. Jeśli rzeczywista liczba odebranych zdarzeń jest większa niż max_batch_size, odebrane zdarzenia są podzielone na partie i wywołania zwrotne dla każdej partii z maksymalnie max_batch_size zdarzeniami.
- max_wait_time
- float
Maksymalny interwał w sekundach oczekiwania procesora zdarzeń przed wywołaniem wywołania zwrotnego. Jeśli w tym interwale nie zostaną odebrane żadne zdarzenia, wywołanie zwrotne on_event_batch zostanie wywołane z pustą listą. Jeśli ta wartość jest ustawiona na Wartość Brak lub 0 (wartość domyślna), wywołanie zwrotne nie będzie wywoływane do momentu odebrania zdarzeń.
- partition_id
- str
Jeśli zostanie określony, klient otrzyma tylko z tej partycji. W przeciwnym razie klient otrzyma z wszystkich partycji.
- owner_level
- int
Priorytet dla wyłącznego konsumenta. W przypadku ustawienia owner_level zostanie utworzony wyłączny konsument. Użytkownik o wyższym owner_level ma wyższy priorytet wyłączny. Poziom właściciela jest również znany jako "wartość epoki" konsumenta.
- prefetch
- int
Liczba zdarzeń do wstępnego pobrania z usługi na potrzeby przetwarzania. Wartość domyślna to 300.
- track_last_enqueued_event_properties
- bool
Wskazuje, czy odbiorca powinien zażądać informacji o ostatnim zdarzeniu w kolejce na skojarzonej partycji i śledzić te informacje w miarę odbierania zdarzeń. Gdy są śledzone informacje o zdarzeniach ostatnio w kolejce partycji, każde zdarzenie odebrane z usługi Event Hubs będzie zawierać metadane dotyczące partycji. Powoduje to niewielkie użycie dodatkowej przepustowości sieci, które jest zazwyczaj korzystnym kompromisem w przypadku okresowego wysyłania żądań dotyczących właściwości partycji przy użyciu klienta centrum zdarzeń. Domyślnie jest ustawiona wartość Fałsz .
Rozpocznij odbieranie z tej pozycji zdarzenia, jeśli nie ma danych punktu kontrolnego dla partycji. Dane punktu kontrolnego będą używane, jeśli są dostępne. Może to być dykt z identyfikatorem partycji jako kluczem i pozycją jako wartością poszczególnych partycji lub pojedynczą wartością dla wszystkich partycji. Typ wartości może być typu str, int lub datetime.datetime. Obsługiwane są również wartości "-1" do odbierania od początku strumienia i "@latest" do odbierania tylko nowych zdarzeń.
Ustal, czy dana starting_position jest inkluzywna(>=) czy nie (>). True dla inclusive i False dla wyłączność. Może to być dykt z identyfikatorem partycji jako kluczem i wartością logiczną wskazującą, czy starting_position dla określonej partycji jest inkluzywna, czy nie. Może to być również pojedyncza wartość logiczna dla wszystkich starting_position. Wartość domyślna to False.
- on_error
- Callable[[PartitionContext, Exception]]
Funkcja wywołania zwrotnego, która zostanie wywołana, gdy podczas odbierania po ponowieniu prób zostanie zgłoszony błąd lub podczas procesu równoważenia obciążenia. Wywołanie zwrotne przyjmuje dwa parametry: partition_context zawierające informacje o partycji i błąd będący wyjątkiem. partition_context może mieć wartość Brak, jeśli błąd jest zgłaszany podczas procesu równoważenia obciążenia. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_error(partition_context, błąd). Wywołanie zwrotne on_error będzie również wywoływane, jeśli podczas wywołania zwrotnego on_event zostanie zgłoszony nieobsługiwany wyjątek.
- on_partition_initialize
- Callable[[PartitionContext]]
Funkcja wywołania zwrotnego, która zostanie wywołana po odbiorcy dla określonej partycji, zakończy inicjowanie. Zostanie on również wywołany, gdy zostanie utworzony nowy odbiorca partycji wewnętrznej, aby przejąć proces odbierania dla użytkownika partycji wewnętrznej zakończonego niepowodzeniem i zamkniętego użytkownika partycji wewnętrznej. Wywołanie zwrotne przyjmuje jeden parametr: partition_context który zawiera informacje o partycji. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Funkcja wywołania zwrotnego, która zostanie wywołana po zamknięciu odbiorcy dla określonej partycji. Jest to również wywoływane, gdy podczas odbierania błędu po zakończeniu ponownych prób zostanie zgłoszony błąd. Wywołanie zwrotne przyjmuje dwa parametry: partition_context który zawiera informacje o partycji i przyczynę zamknięcia. Wywołanie zwrotne powinno być zdefiniowane w następujący sposób: on_partition_close(partition_context, reason). Zapoznaj się z CloseReason różnymi przyczynami zamknięcia.
Typ zwracany
Przykłady
Odbieranie zdarzeń w partiach z usługi EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python