EventHubProducerClient Klasa
Klasa EventHubProducerClient definiuje interfejs wysokiego poziomu do wysyłania zdarzeń do usługi Azure Event Hubs.
- Dziedziczenie
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubProducerClient
Konstruktor
EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)
Parametry
- fully_qualified_namespace
- str
W pełni kwalifikowana nazwa hosta dla przestrzeni nazw usługi Event Hubs. Prawdopodobnie będzie to podobne do .servicebus.windows.net
- credential
- AsyncTokenCredential lub AzureSasCredential lub AzureNamedKeyCredential
Obiekt poświadczeń używany do uwierzytelniania, który implementuje określony interfejs do pobierania tokenów. Akceptuje EventHubSharedKeyCredentialobiekty , lub poświadczenia generowane przez bibliotekę tożsamości platformy Azure i obiekty implementujące metodę *get_token(self, scopes).
- buffered_mode
- bool
Jeśli wartość True, klient producenta będzie zbierać zdarzenia w buforze, efektywnie wsadowy, a następnie publikować. Wartość domyślna to Fałsz.
Wywołanie zwrotne, które ma być wywoływane po pomyślnym opublikowaniu partii. Wywołanie zwrotne przyjmuje dwa parametry:
zdarzenia: lista zdarzeń, które zostały pomyślnie opublikowane
partition_id: identyfikator partycji, do którego zostały opublikowane zdarzenia na liście.
Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_success(events, partition_id). Wymagane, gdy buffered_mode ma wartość True, jeśli buffered_mode jest fałszem.
Wywołanie zwrotne, które ma być wywoływane po opublikowaniu partii. Wymagane, gdy w buffered_mode ma wartość True, jeśli buffered_mode jest fałszem. Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_error(events, partition_id, error), gdzie:
zdarzenia: lista zdarzeń, których nie można opublikować,
partition_id: identyfikator partycji, do którego podjęto próbę opublikowania zdarzeń na liście i
błąd: wyjątek związany z niepowodzeniem wysyłania.
Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:
Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,
następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, które następnie zostaną wywołane.
Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,
następnie domyślnie zostanie zgłoszony błąd.
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:
Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.
Jeśli zdarzenia nie będą wysyłane po pomyślnym w kolejce, wywołanie zwrotne on_error zostanie wywołane.
- max_buffer_length
- int
Tylko tryb buforowany. Łączna liczba zdarzeń na partycję, które można buforować przed wyzwoleniem opróżnienia. Wartość domyślna to 1500 w trybie buforowym.
Tylko tryb buforowany. Czas oczekiwania na utworzenie partii z zdarzeniami w buforze przed opublikowaniem. Wartość domyślna to 1 w trybie buforowym.
- logging_enable
- bool
Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.
- auth_timeout
- float
Czas w sekundach oczekiwania na autoryzację tokenu przez usługę. Wartość domyślna to 60 sekund. Jeśli ustawiono wartość 0, nie zostanie wymuszony limit czasu od klienta.
- user_agent
- str
Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.
- retry_total
- int
Całkowita liczba prób ponownego wykonania operacji, która zakończyła się niepowodzeniem po wystąpieniu błędu. Wartość domyślna to 3.
- retry_backoff_factor
- float
Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnienia). W trybie stałym zasady ponawiania będą zawsze w stanie uśpienia dla elementu {backoff factor}. W trybie wykładniczym zasady ponawiania prób uśpią następujące elementy: {backoff factor} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.
- retry_backoff_max
- float
Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).
- retry_mode
- str
Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".
- idle_timeout
- float
Przekroczenie limitu czasu w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie istnieje żadne działanie. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.
- transport_type
- TransportType
Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.
- http_proxy
- dict
Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".
Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.
Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.
- uamqp_transport
- bool
Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.
- socket_timeout
- float
Czas w sekundach, w którym bazowe gniazdo w połączeniu powinno czekać podczas wysyłania i odbierania danych przed przekroczeniem limitu czasu. Wartość domyślna to 0,2 dla elementu TransportType.Amqp i 1 dla elementu TransportType.AmqpOverWebsocket. Jeśli występują błędy EventHubsConnectionError z powodu limitu czasu zapisu, może być konieczne przekazanie większej niż wartość domyślna. Jest to przeznaczone dla zaawansowanych scenariuszy użycia i zwykle wartość domyślna powinna być wystarczająca.
Przykłady
Utwórz nowe wystąpienie klasy EventHubProducerClient.
import os
from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Metody
close |
Zamknij klienta producenta bazowego połączenia AMQP i linków. |
create_batch |
Utwórz obiekt EventDataBatch o maksymalnym rozmiarze całej zawartości ograniczanej przez max_size_in_bytes. Max_size_in_bytes nie powinna być większa niż maksymalny dozwolony rozmiar komunikatu zdefiniowany przez usługę. |
flush |
Tylko tryb buforowany. Opróżnij zdarzenia w buforze, które mają być wysyłane natychmiast, jeśli klient działa w trybie buforowym. |
from_connection_string |
Utwórz element EventHubProducerClient na podstawie parametry połączenia. |
get_buffered_event_count |
Liczba zdarzeń buforowanych i oczekujących na opublikowanie dla danej partycji. Zwraca wartość None w trybie bez buforowania. UWAGA: bufor zdarzeń jest przetwarzany w tle coroutine, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu. W przypadku identyfikatora partycji, który nie ma zdarzeń buforowanych, 0 zostanie zwrócone niezależnie od tego, czy ten identyfikator partycji rzeczywiście istnieje w centrum zdarzeń. |
get_eventhub_properties |
Pobierz właściwości centrum zdarzeń. Klucze w zwracanym słowniku obejmują:
|
get_partition_ids |
Pobierz identyfikatory partycji centrum zdarzeń. |
get_partition_properties |
Pobierz właściwości określonej partycji. Klucze w słowniku właściwości obejmują:
|
send_batch |
Wysyła partię danych zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli klasa EventHubProducerClient jest skonfigurowana do uruchamiania w trybie buforowanym, metoda będzie umieszczać zdarzenia w kolejce do buforu lokalnego i zwracać je. Producent wykona automatyczne wysyłanie w tle. Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:
W trybie buforowania wysyłanie partii pozostanie nienaruszone i wysłane jako pojedyncza jednostka. Partia nie zostanie ponownie rozmieszona. Może to spowodować nieefektywność wysyłania zdarzeń. Jeśli wysyłasz ograniczoną listę zdarzeńData lub AmqpAnnotatedMessage i wiesz, że znajduje się ona w limicie rozmiaru ramki centrum zdarzeń, możesz wysłać je za pomocą wywołania send_batch . W przeciwnym razie użyj polecenia create_batch , aby utworzyć element EventDataBatch i dodać element EventData lub AmqpAnnotatedMessage do partii jeden po drugim, aż do limitu rozmiaru, a następnie wywołaj tę metodę, aby wysłać partię. |
send_event |
Wysyła dane zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli klasa EventHubProducerClient jest skonfigurowana do uruchamiania w trybie buforowanym, metoda będzie umieszczać zdarzenie w kolejce do buforu lokalnego i zwracać je. Producent wykona automatyczne przetwarzanie wsadowe i wysyłanie w tle. Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób: * Jeśli wywołanie zwrotne on_error zostanie przekazane podczas tworzenia wystąpienia klienta producenta,
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane i błędy będą obsługiwane w następujący sposób: * Jeśli zdarzenia nie będą w kolejce w ramach danego limitu czasu, zostanie zgłoszony błąd bezpośrednio.
|
close
Zamknij klienta producenta bazowego połączenia AMQP i linków.
async close(*, flush: bool = True, **kwargs: Any) -> None
Parametry
- flush
- bool
Tylko tryb buforowany. W przypadku ustawienia wartości True zdarzenia w buforze będą wysyłane natychmiast. Wartość domyślna to True.
Tylko tryb buforowany. Limit czasu zamknięcia producenta. Wartość domyślna to Brak, co oznacza brak limitu czasu.
Typ zwracany
Wyjątki
Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.
Przykłady
Zamknij program obsługi.
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
try:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
finally:
# Close down the producer handler.
await producer.close()
create_batch
Utwórz obiekt EventDataBatch o maksymalnym rozmiarze całej zawartości ograniczanej przez max_size_in_bytes.
Max_size_in_bytes nie powinna być większa niż maksymalny dozwolony rozmiar komunikatu zdefiniowany przez usługę.
async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch
Typ zwracany
Wyjątki
Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.
Przykłady
Tworzenie obiektu EventDataBatch w ograniczonym rozmiarze
from azure.eventhub import EventData
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
flush
Tylko tryb buforowany. Opróżnij zdarzenia w buforze, które mają być wysyłane natychmiast, jeśli klient działa w trybie buforowym.
async flush(**kwargs: Any) -> None
Parametry
Limit czasu opróżniania buforowanych zdarzeń, wartość domyślna to Brak, co oznacza brak limitu czasu.
Typ zwracany
Wyjątki
Jeśli producent nie opróżni buforu w danym przedziale czasu w trybie buforowym.
from_connection_string
Utwórz element EventHubProducerClient na podstawie parametry połączenia.
from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient
Parametry
- eventhub_name
- str
Ścieżka określonego centrum zdarzeń do połączenia klienta.
- buffered_mode
- bool
Jeśli wartość True, klient producenta będzie zbierać zdarzenia w buforze, efektywnie wsadowy, a następnie publikować. Wartość domyślna to Fałsz.
Wywołanie zwrotne, które ma być wywoływane po pomyślnym opublikowaniu partii. Wywołanie zwrotne przyjmuje dwa parametry:
zdarzenia: lista zdarzeń, które zostały pomyślnie opublikowane
partition_id: identyfikator partycji, do którego zostały opublikowane zdarzenia na liście.
Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_success(events, partition_id). Jest to wymagane, gdy buffered_mode ma wartość True, jeśli buffered_mode ma wartość False.
Wywołanie zwrotne, które ma być wywoływane po opublikowaniu partii. Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_error(events, partition_id, error), gdzie:
zdarzenia: lista zdarzeń, których nie można opublikować,
partition_id: identyfikator partycji, do którego podjęto próbę opublikowania zdarzeń na liście i
błąd: wyjątek związany z niepowodzeniem wysyłania.
Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:
Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,
następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, które następnie zostaną wywołane.
Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,
następnie domyślnie zostanie zgłoszony błąd.
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:
Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.
Jeśli zdarzenia nie będą wysyłane po pomyślnym w kolejce, wywołanie zwrotne on_error zostanie wywołane.
- max_buffer_length
- int
Tylko tryb buforowany. Łączna liczba zdarzeń na partycję, które można buforować przed wyzwoleniem opróżnienia. Wartość domyślna to 1500 w trybie buforowym.
Tylko tryb buforowany. Czas oczekiwania na utworzenie partii z zdarzeniami w buforze przed opublikowaniem. Wartość domyślna to 1 w trybie buforowym.
- logging_enable
- bool
Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.
- http_proxy
- dict
Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".
- auth_timeout
- float
Czas w sekundach oczekiwania na autoryzację tokenu przez usługę. Wartość domyślna to 60 sekund. Jeśli ustawiono wartość 0, nie zostanie wymuszony limit czasu od klienta.
- user_agent
- str
Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.
- retry_total
- int
Całkowita liczba prób ponownego wykonania operacji, która zakończyła się niepowodzeniem po wystąpieniu błędu. Wartość domyślna to 3.
- retry_backoff_factor
- float
Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnienia). W trybie stałym zasady ponawiania będą zawsze w stanie uśpienia dla elementu {backoff factor}. W trybie wykładniczym zasady ponawiania prób uśpią następujące elementy: {backoff factor} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.
- retry_backoff_max
- float
Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).
- retry_mode
- str
Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".
- idle_timeout
- float
Przekroczenie limitu czasu w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie istnieje żadne działanie. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.
- transport_type
- TransportType
Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.
Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.
Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.
- uamqp_transport
- bool
Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.
Typ zwracany
Wyjątki
Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.
Przykłady
Utwórz nowe wystąpienie klasy EventHubProducerClient z parametry połączenia.
import os
from azure.eventhub.aio import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_buffered_event_count
Liczba zdarzeń buforowanych i oczekujących na opublikowanie dla danej partycji. Zwraca wartość None w trybie bez buforowania. UWAGA: bufor zdarzeń jest przetwarzany w tle coroutine, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu. W przypadku identyfikatora partycji, który nie ma zdarzeń buforowanych, 0 zostanie zwrócone niezależnie od tego, czy ten identyfikator partycji rzeczywiście istnieje w centrum zdarzeń.
get_buffered_event_count(partition_id: str) -> int | None
Parametry
Typ zwracany
Wyjątki
Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.
get_eventhub_properties
Pobierz właściwości centrum zdarzeń.
Klucze w zwracanym słowniku obejmują:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (lista[str])
async get_eventhub_properties() -> Dict[str, Any]
Zwraca
Słownik zawierający informacje o centrum zdarzeń.
Typ zwracany
Wyjątki
get_partition_ids
Pobierz identyfikatory partycji centrum zdarzeń.
async get_partition_ids() -> List[str]
Zwraca
Lista identyfikatorów partycji.
Typ zwracany
Wyjątki
get_partition_properties
Pobierz właściwości określonej partycji.
Klucze w słowniku właściwości obejmują:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametry
Zwraca
Dykt właściwości partycji.
Typ zwracany
Wyjątki
send_batch
Wysyła partię danych zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli klasa EventHubProducerClient jest skonfigurowana do uruchamiania w trybie buforowanym, metoda będzie umieszczać zdarzenia w kolejce do buforu lokalnego i zwracać je. Producent wykona automatyczne wysyłanie w tle.
Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:
Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,
następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, który następnie zostanie wywołany.
Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,
następnie błąd zostanie zgłoszony domyślnie.
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:
Jeśli nie można umieścić zdarzeń w kolejce w ramach danego limitu czasu, zostanie zgłoszony błąd bezpośrednio.
Jeśli zdarzenia nie będą wysyłane po pomyślnym zapisie w kolejce, wywołanie zwrotne on_error zostanie wywołane.
W trybie buforowania wysyłanie partii pozostanie nienaruszone i wysłane jako pojedyncza jednostka. Partia nie zostanie ponownie rozmieszona. Może to spowodować nieefektywność wysyłania zdarzeń.
Jeśli wysyłasz ograniczoną listę zdarzeńData lub AmqpAnnotatedMessage i wiesz, że znajduje się ona w limicie rozmiaru ramki centrum zdarzeń, możesz wysłać je za pomocą wywołania send_batch . W przeciwnym razie użyj polecenia create_batch , aby utworzyć element EventDataBatch i dodać element EventData lub AmqpAnnotatedMessage do partii jeden po drugim, aż do limitu rozmiaru, a następnie wywołaj tę metodę, aby wysłać partię.
async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None
Parametry
- event_data_batch
- Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Obiekt EventDataBatch , który ma zostać wysłany lub lista zdarzeń, które mają być wysyłane w partii. Wszystkie zdarzenia EventData lub AmqpAnnotatedMessage na liście lub EventDataBatch zostaną wylądować na tej samej partycji.
- timeout
- float
Maksymalny czas oczekiwania na wysłanie danych zdarzenia w trybie niebuforowany lub maksymalny czas oczekiwania w kolejce danych zdarzenia do buforu w trybie buforowym. W trybie bez buforowania domyślny czas oczekiwania określony podczas tworzenia producenta zostanie użyty. W trybie buforowym domyślny czas oczekiwania to Brak.
- partition_id
- str
Określony identyfikator partycji do wysłania. Wartość domyślna to Brak, w którym przypadku usługa zostanie przypisana do wszystkich partycji przy użyciu działania okrężnego. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_id, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ samo zdarzenie EventDataBatch ma partition_id.
- partition_key
- str
W przypadku danego partition_key dane zdarzeń zostaną wysłane do określonej partycji centrum zdarzeń określonego przez usługę. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_key, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ sama funkcja EventDataBatch ma partition_key. Jeśli zostaną podane zarówno partition_id, jak i partition_key, partition_id będą miały pierwszeństwo. OSTRZEŻENIE: Ustawienie partition_key wartości innej niż ciąg dla zdarzeń do wysłania jest niezalecane, ponieważ partition_key zostanie zignorowana przez usługę Centrum zdarzeń, a zdarzenia zostaną przypisane do wszystkich partycji przy użyciu działania okrężnego. Ponadto istnieją zestawy SDK do używania zdarzeń, które oczekują, że partition_key być tylko typem ciągu, mogą one nie przeanalizować wartości innej niż ciąg.
Typ zwracany
Wyjątki
Jeśli wartość określona przez parametr limitu czasu upłynął przed wysłaniem zdarzenia w trybie niebuforowym lub zdarzenia mogą zostać umieszczone w kolejce do buforowanego w trybie buforowym.
Przykłady
Asynchronicznie wysyła dane zdarzeń
async with producer:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
send_event
Wysyła dane zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli klasa EventHubProducerClient jest skonfigurowana do uruchamiania w trybie buforowanym, metoda będzie umieszczać zdarzenie w kolejce do buforu lokalnego i zwracać je. Producent wykona automatyczne przetwarzanie wsadowe i wysyłanie w tle.
Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób: * Jeśli wywołanie zwrotne on_error zostanie przekazane podczas tworzenia wystąpienia klienta producenta,
then error information will be passed to the *on_error* callback, which will then be called.
* If an *on_error* callback is not passed in during client instantiation,
then the error will be raised by default.
Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane i błędy będą obsługiwane w następujący sposób: * Jeśli zdarzenia nie będą w kolejce w ramach danego limitu czasu, zostanie zgłoszony błąd bezpośrednio.
* If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None
Parametry
- timeout
- float
Maksymalny czas oczekiwania na wysłanie danych zdarzenia w trybie bez buforowania lub maksymalny czas oczekiwania w kolejce danych zdarzenia do buforu w trybie buforowania. W trybie bez buforowania zostanie użyty domyślny czas oczekiwania określony podczas tworzenia producenta. W trybie buforowym domyślny czas oczekiwania to Brak.
- partition_id
- str
Określony identyfikator partycji do wysłania. Wartość domyślna to Brak. W tym przypadku usługa zostanie przypisana do wszystkich partycji przy użyciu działania okrężnego. Błąd TypeError zostanie zgłoszony, jeśli określono partition_id i event_data_batch jest zdarzeniem EventDataBatch, ponieważ sama klasa EventDataBatch ma partition_id.
- partition_key
- str
W przypadku danej partition_key dane zdarzeń będą wysyłane do określonej partycji centrum zdarzeń określonego przez usługę. Błąd TypeError zostanie zgłoszony, jeśli określono partition_key i event_data_batch jest zdarzeniem EventDataBatch , ponieważ sama klasa EventDataBatch ma partition_key. Jeśli podano zarówno partition_id, jak i partition_key, pierwszeństwo będzie mieć partition_id. OSTRZEŻENIE: Ustawienie partition_key wartości innej niż ciąg dla zdarzeń do wysłania jest niezalecane, ponieważ partition_key zostanie zignorowana przez usługę Centrum zdarzeń, a zdarzenia zostaną przypisane do wszystkich partycji przy użyciu działania okrężnego. Ponadto istnieją zestawy SDK do używania zdarzeń, które oczekują, że partition_key być tylko typem ciągu, mogą one nie przeanalizować wartości innej niż ciąg.
Typ zwracany
Wyjątki
Jeśli wartość określona przez parametr limitu czasu upłynie przed wysłaniem zdarzenia w trybie bez buforowania lub zdarzenia nie mogą być umieszczone w kolejce do buforowanego w trybie buforowania.
Atrybuty
total_buffered_event_count
Łączna liczba zdarzeń, które są obecnie buforowane i oczekujące na opublikowanie, we wszystkich partycjach. Zwraca wartość None w trybie bez buforowania. UWAGA: bufor zdarzeń jest przetwarzany w tle coroutine, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu.
Typ zwracany
Azure SDK for Python