EventHubProducerClient Klasa
Klasa EventHubProducerClient definiuje interfejs wysokiego poziomu do wysyłania zdarzeń do usługi Azure Event Hubs.
- Dziedziczenie
-
azure.eventhub._client_base.ClientBaseEventHubProducerClient
Konstruktor
EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[False] = False, **kwargs: Any)
Parametry
- fully_qualified_namespace
- str
W pełni kwalifikowana nazwa hosta dla przestrzeni nazw usługi Event Hubs. Prawdopodobnie będzie to podobne do .servicebus.windows.net
- credential
- TokenCredential lub AzureSasCredential lub AzureNamedKeyCredential
Obiekt poświadczeń używany do uwierzytelniania, który implementuje określony interfejs do pobierania tokenów. Akceptuje EventHubSharedKeyCredentialobiekty , lub poświadczenia generowane przez bibliotekę tożsamości platformy Azure i obiekty implementujące metodę *get_token(self, scopes).
- buffered_mode
- bool
Jeśli wartość True, klient producenta będzie zbierać zdarzenia w buforze, efektywnie wsadowy, a następnie publikować. Wartość domyślna to Fałsz.
Element ThreadPoolExecutor, który ma być używany do publikowania zdarzeń lub liczby procesów roboczych dla klasy ThreadPoolExecutor. Wartość domyślna to Brak, a element ThreadPoolExecutor z domyślną liczbą procesów roboczych zostanie utworzony na https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
Wywołanie zwrotne, które ma być wywoływane po pomyślnym opublikowaniu partii. Wywołanie zwrotne przyjmuje dwa parametry:
zdarzenia: lista zdarzeń, które zostały pomyślnie opublikowane
partition_id: identyfikator partycji, do którego zostały opublikowane zdarzenia na liście.
Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_success(events, partition_id). Jest to wymagane, gdy buffered_mode ma wartość True, jeśli buffered_mode ma wartość False.
Wywołanie zwrotne, które ma być wywoływane po opublikowaniu partii. Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_error(events, partition_id, error), gdzie:
zdarzenia: lista zdarzeń, których nie można opublikować,
partition_id: identyfikator partycji, do którego podjęto próbę opublikowania zdarzeń na liście i
błąd: wyjątek związany z niepowodzeniem wysyłania.
Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:
Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,
następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, które następnie zostaną wywołane.
Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,
następnie domyślnie zostanie zgłoszony błąd.
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:
Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.
Jeśli zdarzenia nie będą wysyłane po pomyślnym w kolejce, wywołanie zwrotne on_error zostanie wywołane.
- max_buffer_length
- int
Tylko tryb buforowany. Łączna liczba zdarzeń na partycję, które można buforować przed wyzwoleniem opróżnienia. Wartość domyślna to 1500 w trybie buforowym.
Tylko tryb buforowany. Czas oczekiwania na utworzenie partii z zdarzeniami w buforze przed opublikowaniem. Wartość domyślna to 1 w trybie buforowym.
- logging_enable
- bool
Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.
- auth_timeout
- float
Czas w sekundach oczekiwania na autoryzację tokenu przez usługę. Wartość domyślna to 60 sekund. Jeśli ustawiono wartość 0, nie zostanie wymuszony limit czasu od klienta.
- user_agent
- str
Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.
- retry_total
- int
Całkowita liczba prób ponownego wykonania operacji, która zakończyła się niepowodzeniem po wystąpieniu błędu. Wartość domyślna to 3.
- retry_backoff_factor
- float
Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnienia). W trybie stałym zasady ponawiania będą zawsze w stanie uśpienia dla elementu {backoff factor}. W trybie wykładniczym zasady ponawiania prób uśpią następujące elementy: {backoff factor} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.
- retry_backoff_max
- float
Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).
- retry_mode
- str
Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".
- idle_timeout
- float
Przekroczenie limitu czasu w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie istnieje żadne działanie. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.
- transport_type
- TransportType
Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.
- http_proxy
- Dict
Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".
Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.
Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.
- uamqp_transport
- bool
Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.
- socket_timeout
- float
Czas w sekundach, w którym bazowe gniazdo w połączeniu powinno czekać podczas wysyłania i odbierania danych przed przekroczeniem limitu czasu. Wartość domyślna to 0,2 dla elementu TransportType.Amqp i 1 dla elementu TransportType.AmqpOverWebsocket. Jeśli występują błędy EventHubsConnectionError z powodu limitu czasu zapisu, może być konieczne przekazanie większej niż wartość domyślna. Jest to przeznaczone dla zaawansowanych scenariuszy użycia i zwykle wartość domyślna powinna być wystarczająca.
Przykłady
Utwórz nowe wystąpienie klasy EventHubProducerClient.
import os
from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
producer = EventHubProducerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name, # EventHub name should be specified if it doesn't show up in connection string.
credential=credential
)
Metody
close |
Zamknij klienta producenta bazowego połączenia AMQP i linków. |
create_batch |
Utwórz obiekt EventDataBatch o maksymalnym rozmiarze całej zawartości ograniczanej przez max_size_in_bytes. Max_size_in_bytes nie powinna być większa niż maksymalny dozwolony rozmiar komunikatu zdefiniowany przez usługę. |
flush |
Tylko tryb buforowany. Opróżnij zdarzenia w buforze, które mają być wysyłane natychmiast, jeśli klient działa w trybie buforowym. |
from_connection_string |
Utwórz element EventHubProducerClient na podstawie parametry połączenia. |
get_buffered_event_count |
Liczba zdarzeń, które są buforowane i oczekujące na opublikowanie dla danej partycji. Zwraca wartość None w trybie bez buforowania. UWAGA: Bufor zdarzeń jest przetwarzany w wątku w tle, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu. W przypadku identyfikatora partycji, który nie ma zdarzeń buforowanych, 0 zostanie zwrócone niezależnie od tego, czy ten identyfikator partycji rzeczywiście istnieje w centrum zdarzeń. |
get_eventhub_properties |
Pobieranie właściwości centrum zdarzeń. Klucze w zwracanym słowniku obejmują:
|
get_partition_ids |
Pobieranie identyfikatorów partycji centrum zdarzeń. |
get_partition_properties |
Pobierz właściwości określonej partycji. Klucze w słowniku właściwości obejmują:
|
send_batch |
Wysyła partię danych zdarzeń. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli obiekt EventHubProducerClient jest skonfigurowany do uruchamiania w trybie buforowanym, metoda spróbuje w kolejce zdarzeń do buforu w danym czasie, jeśli zostanie określony i zwrócony. Producent wykona automatyczne wysyłanie w tle w trybie buforowym. Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:
W trybie buforowym wysyłanie partii pozostanie nienaruszone i wysłane jako pojedyncza jednostka. Partia nie zostanie ponownie rozmieszona. Może to spowodować nieefektywność wysyłania zdarzeń. Jeśli wysyłasz skończone listy eventData lub AmqpAnnotatedMessage i wiesz, że znajduje się ona w limicie rozmiaru ramki centrum zdarzeń, możesz wysłać je za pomocą wywołania send_batch . W przeciwnym razie użyj polecenia create_batch , aby utworzyć zdarzenie EventDataBatch i dodać element EventData lub AmqpAnnotatedMessage do partii jeden po drugim do limitu rozmiaru, a następnie wywołać tę metodę, aby wysłać partię. |
send_event |
Wysyła dane zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli obiekt EventHubProducerClient jest skonfigurowany do uruchamiania w trybie buforowanym, metoda spróbuje w kolejce zdarzeń do buforu w danym czasie, jeśli zostanie określony i zwrócony. Producent wykona automatyczne wysyłanie w tle w trybie buforowym. Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób: * Jeśli wywołanie zwrotne on_error zostanie przekazane podczas tworzenia wystąpienia klienta producenta,
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób: * Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.
|
close
Zamknij klienta producenta bazowego połączenia AMQP i linków.
close(*, flush: bool = True, **kwargs: Any) -> None
Parametry
- flush
- bool
Tylko tryb buforowany. W przypadku ustawienia wartości True zdarzenia w buforze będą wysyłane natychmiast. Wartość domyślna to True.
Tylko tryb buforowany. Limit czasu zamknięcia producenta. Wartość domyślna to Brak, co oznacza brak limitu czasu.
Typ zwracany
Wyjątki
Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.
Przykłady
Zamknij klienta.
import os
from azure.eventhub import EventHubProducerClient, EventData
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
try:
event_data_batch = producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# EventDataBatch object reaches max_size.
# New EventDataBatch object can be created here to send more data
break
producer.send_batch(event_data_batch)
finally:
# Close down the producer handler.
producer.close()
create_batch
Utwórz obiekt EventDataBatch o maksymalnym rozmiarze całej zawartości ograniczanej przez max_size_in_bytes.
Max_size_in_bytes nie powinna być większa niż maksymalny dozwolony rozmiar komunikatu zdefiniowany przez usługę.
create_batch(**kwargs: Any) -> EventDataBatch
Typ zwracany
Wyjątki
Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.
Przykłady
Tworzenie obiektu EventDataBatch w ograniczonym rozmiarze
event_data_batch = producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
flush
Tylko tryb buforowany. Opróżnij zdarzenia w buforze, które mają być wysyłane natychmiast, jeśli klient działa w trybie buforowym.
flush(**kwargs: Any) -> None
Parametry
Limit czasu opróżniania buforowanych zdarzeń, wartość domyślna to Brak, co oznacza brak limitu czasu.
Typ zwracany
Wyjątki
Jeśli producent nie opróżni buforu w danym przedziale czasu w trybie buforowym.
from_connection_string
Utwórz element EventHubProducerClient na podstawie parametry połączenia.
from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[False] = False, **kwargs: Any) -> EventHubProducerClient
Parametry
- eventhub_name
- str
Ścieżka określonego centrum zdarzeń w celu połączenia klienta z.
- buffered_mode
- bool
Jeśli wartość True, klient producenta będzie zbierać zdarzenia w buforze, wydajnie wsadowo, a następnie publikować. Wartość domyślna to False.
Element ThreadPoolExecutor, który ma być używany do publikowania zdarzeń lub liczby procesów roboczych dla klasy ThreadPoolExecutor. Wartość domyślna to Brak, a element ThreadPoolExecutor z domyślną liczbą procesów roboczych zostanie utworzony na https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
Wywołanie zwrotne, które ma być wywoływane po pomyślnym opublikowaniu partii. Wywołanie zwrotne przyjmuje dwa parametry:
zdarzenia: lista zdarzeń, które zostały pomyślnie opublikowane
partition_id: identyfikator partycji, do którego opublikowano zdarzenia na liście.
Funkcja wywołania zwrotnego powinna być zdefiniowana na przykład: on_success(events, partition_id). Wymagana, jeśli buffered_mode ma wartość True, a opcjonalne, jeśli buffered_mode ma wartość False.
Wywołanie zwrotne, które ma być wywoływane po opublikowaniu partii. Wymagane, gdy w buffered_mode ma wartość True, a opcjonalne, jeśli buffered_mode ma wartość False. Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_error(zdarzenia, partition_id, błąd), gdzie:
zdarzenia: lista zdarzeń, których nie można opublikować,
partition_id: identyfikator partycji, do którego próbowano opublikować zdarzenia na liście i
błąd: wyjątek związany z niepowodzeniem wysyłania.
Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:
Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,
następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, który następnie zostanie wywołany.
Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,
następnie błąd zostanie zgłoszony domyślnie.
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:
Jeśli nie można umieścić zdarzeń w kolejce w ramach danego limitu czasu, zostanie zgłoszony błąd bezpośrednio.
Jeśli zdarzenia nie będą wysyłane po pomyślnym zapisie w kolejce, wywołanie zwrotne on_error zostanie wywołane.
- max_buffer_length
- int
Tylko tryb buforowany. Łączna liczba zdarzeń na partycję, które mogą być buforowane przed wyzwoleniem opróżnienia. Wartość domyślna to 1500 w trybie buforowym.
Tylko tryb buforowany. Czas oczekiwania na utworzenie partii ze zdarzeniami w buforze przed opublikowaniem. Wartość domyślna to 1 w trybie buforowym.
- logging_enable
- bool
Określa, czy dane wyjściowe dzienników śledzenia sieci do rejestratora. Wartość domyślna to False.
- http_proxy
- Dict
Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość ciągu) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".
- auth_timeout
- float
Czas w sekundach oczekiwania na autoryzowanie tokenu przez usługę. Wartość domyślna to 60 sekund. W przypadku ustawienia wartości 0 nie zostanie wymuszony limit czasu od klienta.
- user_agent
- str
Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.
- retry_total
- int
Całkowita liczba prób ponownego wykonania operacji zakończonych niepowodzeniem w przypadku wystąpienia błędu. Wartość domyślna to 3.
- retry_backoff_factor
- float
Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnień). W trybie stałym zasady ponawiania zawsze będą w stanie uśpienia dla składnika {backoff}. W trybie wykładniczym zasady ponawiania zostaną uśpine dla: {współczynnik wycofywania} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie zostanie spane dla wartości [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.
- retry_backoff_max
- float
Maksymalny czas wycofywania. Wartość domyślna to 120 sekund (2 minuty).
- retry_mode
- str
Zachowanie opóźnienia między ponownymi próbami. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".
- idle_timeout
- float
Limit czasu (w sekundach), po którym klient zamknie połączenie bazowe, jeśli nie ma żadnej aktywności. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.
- transport_type
- TransportType
Typ protokołu transportowego, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.
- http_proxy
Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość ciągu) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".
Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiający kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie wyglądać następująco: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie będzie używany port 443.
Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to Brak, w którym przypadku zostanie użyty element certifi.where().
- uamqp_transport
- bool
Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.
Typ zwracany
Wyjątki
Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.
Przykłady
Utwórz nowe wystąpienie klasy EventHubProducerClient z parametry połączenia.
import os
from azure.eventhub import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_buffered_event_count
Liczba zdarzeń, które są buforowane i oczekujące na opublikowanie dla danej partycji. Zwraca wartość None w trybie bez buforowania. UWAGA: Bufor zdarzeń jest przetwarzany w wątku w tle, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu. W przypadku identyfikatora partycji, który nie ma zdarzeń buforowanych, 0 zostanie zwrócone niezależnie od tego, czy ten identyfikator partycji rzeczywiście istnieje w centrum zdarzeń.
get_buffered_event_count(partition_id: str) -> int | None
Parametry
Typ zwracany
Wyjątki
Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.
get_eventhub_properties
Pobieranie właściwości centrum zdarzeń.
Klucze w zwracanym słowniku obejmują:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
Zwraca
Słownik zawierający właściwości usługi EventHub.
Typ zwracany
Wyjątki
get_partition_ids
Pobieranie identyfikatorów partycji centrum zdarzeń.
get_partition_ids() -> List[str]
Zwraca
Lista identyfikatorów partycji.
Typ zwracany
Wyjątki
get_partition_properties
Pobierz właściwości określonej partycji.
Klucze w słowniku właściwości obejmują:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (wartość logiczna)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametry
Zwraca
Słownik właściwości partycji.
Typ zwracany
Wyjątki
send_batch
Wysyła partię danych zdarzeń. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli obiekt EventHubProducerClient jest skonfigurowany do uruchamiania w trybie buforowanym, metoda spróbuje w kolejce zdarzeń do buforu w danym czasie, jeśli zostanie określony i zwrócony. Producent wykona automatyczne wysyłanie w tle w trybie buforowym.
Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:
Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,
następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, które następnie zostaną wywołane.
Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,
następnie domyślnie zostanie zgłoszony błąd.
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:
Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.
Jeśli zdarzenia nie będą wysyłane po pomyślnym w kolejce, wywołanie zwrotne on_error zostanie wywołane.
W trybie buforowym wysyłanie partii pozostanie nienaruszone i wysłane jako pojedyncza jednostka. Partia nie zostanie ponownie rozmieszona. Może to spowodować nieefektywność wysyłania zdarzeń.
Jeśli wysyłasz skończone listy eventData lub AmqpAnnotatedMessage i wiesz, że znajduje się ona w limicie rozmiaru ramki centrum zdarzeń, możesz wysłać je za pomocą wywołania send_batch . W przeciwnym razie użyj polecenia create_batch , aby utworzyć zdarzenie EventDataBatch i dodać element EventData lub AmqpAnnotatedMessage do partii jeden po drugim do limitu rozmiaru, a następnie wywołać tę metodę, aby wysłać partię.
send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None
Parametry
- event_data_batch
- Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Obiekt EventDataBatch do wysłania lub lista zdarzeń do wysłania w partii. Wszystkie zdarzenia EventData lub AmqpAnnotatedMessage na liście lub EventDataBatch trafią na tę samą partycję.
- timeout
- float
Maksymalny czas oczekiwania na wysłanie danych zdarzenia w trybie bez buforowania lub maksymalny czas oczekiwania w kolejce danych zdarzenia do buforu w trybie buforowania. W trybie bez buforowania zostanie użyty domyślny czas oczekiwania określony podczas tworzenia producenta. W trybie buforowym domyślny czas oczekiwania to Brak.
- partition_id
- str
Określony identyfikator partycji do wysłania. Wartość domyślna to Brak, w którym przypadku usługa zostanie przypisana do wszystkich partycji przy użyciu działania okrężnego. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_id, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ samo zdarzenie EventDataBatch ma partition_id.
- partition_key
- str
W przypadku danego partition_key dane zdarzeń zostaną wysłane do określonej partycji centrum zdarzeń określonego przez usługę. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_key, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ sama funkcja EventDataBatch ma partition_key. Jeśli zostaną podane zarówno partition_id, jak i partition_key, partition_id będą miały pierwszeństwo. OSTRZEŻENIE: Ustawienie partition_key wartości innej niż ciąg dla zdarzeń, które mają być wysyłane, jest odradzane, ponieważ partition_key zostaną zignorowane przez usługę Centrum zdarzeń, a zdarzenia zostaną przypisane do wszystkich partycji przy użyciu działania okrężnego. Ponadto istnieją zestawy SDK do używania zdarzeń, które oczekują, że partition_key mają być tylko typem ciągu, mogą nie przeanalizować wartości innej niż ciąg.
Typ zwracany
Wyjątki
Jeśli wartość określona przez parametr limitu czasu upłynie przed wysłaniem zdarzenia w trybie bez buforowania lub zdarzenia nie mogą być umieszczone w kolejce do buforowanego w trybie buforowania.
Przykłady
Wysyła dane zdarzenia
with producer:
event_data_batch = producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# EventDataBatch object reaches max_size.
# New EventDataBatch object can be created here to send more data
break
producer.send_batch(event_data_batch)
send_event
Wysyła dane zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli obiekt EventHubProducerClient jest skonfigurowany do uruchamiania w trybie buforowanym, metoda spróbuje w kolejce zdarzeń do buforu w danym czasie, jeśli zostanie określony i zwrócony. Producent wykona automatyczne wysyłanie w tle w trybie buforowym.
Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób: * Jeśli wywołanie zwrotne on_error zostanie przekazane podczas tworzenia wystąpienia klienta producenta,
then error information will be passed to the *on_error* callback, which will then be called.
* If an *on_error* callback is not passed in during client instantiation,
then the error will be raised by default.
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób: * Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.
* If events fail to send after enqueuing successfully, the *on_error* callback will be called.
send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None
Parametry
- event_data
- Union[EventData, AmqpAnnotatedMessage]
Obiekt EventData , który ma zostać wysłany.
- timeout
- float
Maksymalny czas oczekiwania na wysłanie danych zdarzenia w trybie niebuforowany lub maksymalny czas oczekiwania w kolejce danych zdarzenia do buforu w trybie buforowym. W trybie bez buforowania domyślny czas oczekiwania określony podczas tworzenia producenta zostanie użyty. W trybie buforowym domyślny czas oczekiwania to Brak.
- partition_id
- str
Określony identyfikator partycji do wysłania. Wartość domyślna to Brak, w którym przypadku usługa zostanie przypisana do wszystkich partycji przy użyciu działania okrężnego. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_id, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ samo zdarzenie EventDataBatch ma partition_id.
- partition_key
- str
W przypadku danego partition_key dane zdarzeń zostaną wysłane do określonej partycji centrum zdarzeń określonego przez usługę. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_key, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ sama funkcja EventDataBatch ma partition_key. Jeśli zostaną podane zarówno partition_id, jak i partition_key, partition_id będą miały pierwszeństwo. OSTRZEŻENIE: Ustawienie partition_key wartości innej niż ciąg dla zdarzeń, które mają być wysyłane, jest odradzane, ponieważ partition_key zostaną zignorowane przez usługę Centrum zdarzeń, a zdarzenia zostaną przypisane do wszystkich partycji przy użyciu działania okrężnego. Ponadto istnieją zestawy SDK do używania zdarzeń, które oczekują, że partition_key mają być tylko typem ciągu, mogą nie przeanalizować wartości innej niż ciąg.
Typ zwracany
Wyjątki
Jeśli wartość określona przez parametr limitu czasu upłynął przed wysłaniem zdarzenia w trybie niebuforowym lub zdarzenia mogą zostać umieszczone w kolejce do buforowanego w trybie buforowym.
Atrybuty
total_buffered_event_count
Łączna liczba zdarzeń, które są obecnie buforowane i oczekujące na opublikowanie, we wszystkich partycjach. Zwraca wartość None w trybie bez buforowania. UWAGA: bufor zdarzeń jest przetwarzany w wątku w tle, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu.
Typ zwracany
Azure SDK for Python