Udostępnij za pośrednictwem


EventHubProducerClient Klasa

Klasa EventHubProducerClient definiuje interfejs wysokiego poziomu do wysyłania zdarzeń do usługi Azure Event Hubs.

Dziedziczenie
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

Konstruktor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

Parametry

fully_qualified_namespace
str
Wymagane

W pełni kwalifikowana nazwa hosta dla przestrzeni nazw usługi Event Hubs. Prawdopodobnie będzie to podobne do .servicebus.windows.net

eventhub_name
str
Wymagane

Ścieżka określonego centrum zdarzeń do połączenia klienta.

credential
AsyncTokenCredential lub AzureSasCredential lub AzureNamedKeyCredential
Wymagane

Obiekt poświadczeń używany do uwierzytelniania, który implementuje określony interfejs do pobierania tokenów. Akceptuje EventHubSharedKeyCredentialobiekty , lub poświadczenia generowane przez bibliotekę tożsamości platformy Azure i obiekty implementujące metodę *get_token(self, scopes).

buffered_mode
bool

Jeśli wartość True, klient producenta będzie zbierać zdarzenia w buforze, efektywnie wsadowy, a następnie publikować. Wartość domyślna to Fałsz.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Wywołanie zwrotne, które ma być wywoływane po pomyślnym opublikowaniu partii. Wywołanie zwrotne przyjmuje dwa parametry:

  • zdarzenia: lista zdarzeń, które zostały pomyślnie opublikowane

  • partition_id: identyfikator partycji, do którego zostały opublikowane zdarzenia na liście.

Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_success(events, partition_id). Wymagane, gdy buffered_mode ma wartość True, jeśli buffered_mode jest fałszem.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Wywołanie zwrotne, które ma być wywoływane po opublikowaniu partii. Wymagane, gdy w buffered_mode ma wartość True, jeśli buffered_mode jest fałszem. Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_error(events, partition_id, error), gdzie:

  • zdarzenia: lista zdarzeń, których nie można opublikować,

  • partition_id: identyfikator partycji, do którego podjęto próbę opublikowania zdarzeń na liście i

  • błąd: wyjątek związany z niepowodzeniem wysyłania.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:

  • Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,

    następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, które następnie zostaną wywołane.

  • Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,

    następnie domyślnie zostanie zgłoszony błąd.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:

  • Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  • Jeśli zdarzenia nie będą wysyłane po pomyślnym w kolejce, wywołanie zwrotne on_error zostanie wywołane.

max_buffer_length
int

Tylko tryb buforowany. Łączna liczba zdarzeń na partycję, które można buforować przed wyzwoleniem opróżnienia. Wartość domyślna to 1500 w trybie buforowym.

max_wait_time
Optional[float]

Tylko tryb buforowany. Czas oczekiwania na utworzenie partii z zdarzeniami w buforze przed opublikowaniem. Wartość domyślna to 1 w trybie buforowym.

logging_enable
bool

Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.

auth_timeout
float

Czas w sekundach oczekiwania na autoryzację tokenu przez usługę. Wartość domyślna to 60 sekund. Jeśli ustawiono wartość 0, nie zostanie wymuszony limit czasu od klienta.

user_agent
str

Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.

retry_total
int

Całkowita liczba prób ponownego wykonania operacji, która zakończyła się niepowodzeniem po wystąpieniu błędu. Wartość domyślna to 3.

retry_backoff_factor
float

Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnienia). W trybie stałym zasady ponawiania będą zawsze w stanie uśpienia dla elementu {backoff factor}. W trybie wykładniczym zasady ponawiania prób uśpią następujące elementy: {backoff factor} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.

retry_backoff_max
float

Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).

retry_mode
str

Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".

idle_timeout
float

Przekroczenie limitu czasu w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie istnieje żadne działanie. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.

transport_type
TransportType

Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.

http_proxy
dict

Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".

custom_endpoint_address
Optional[str]

Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.

connection_verify
Optional[str]

Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.

uamqp_transport
bool

Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.

socket_timeout
float

Czas w sekundach, w którym bazowe gniazdo w połączeniu powinno czekać podczas wysyłania i odbierania danych przed przekroczeniem limitu czasu. Wartość domyślna to 0,2 dla elementu TransportType.Amqp i 1 dla elementu TransportType.AmqpOverWebsocket. Jeśli występują błędy EventHubsConnectionError z powodu limitu czasu zapisu, może być konieczne przekazanie większej niż wartość domyślna. Jest to przeznaczone dla zaawansowanych scenariuszy użycia i zwykle wartość domyślna powinna być wystarczająca.

Przykłady

Utwórz nowe wystąpienie klasy EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metody

close

Zamknij klienta producenta bazowego połączenia AMQP i linków.

create_batch

Utwórz obiekt EventDataBatch o maksymalnym rozmiarze całej zawartości ograniczanej przez max_size_in_bytes.

Max_size_in_bytes nie powinna być większa niż maksymalny dozwolony rozmiar komunikatu zdefiniowany przez usługę.

flush

Tylko tryb buforowany. Opróżnij zdarzenia w buforze, które mają być wysyłane natychmiast, jeśli klient działa w trybie buforowym.

from_connection_string

Utwórz element EventHubProducerClient na podstawie parametry połączenia.

get_buffered_event_count

Liczba zdarzeń buforowanych i oczekujących na opublikowanie dla danej partycji. Zwraca wartość None w trybie bez buforowania. UWAGA: bufor zdarzeń jest przetwarzany w tle coroutine, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu. W przypadku identyfikatora partycji, który nie ma zdarzeń buforowanych, 0 zostanie zwrócone niezależnie od tego, czy ten identyfikator partycji rzeczywiście istnieje w centrum zdarzeń.

get_eventhub_properties

Pobierz właściwości centrum zdarzeń.

Klucze w zwracanym słowniku obejmują:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (lista[str])

get_partition_ids

Pobierz identyfikatory partycji centrum zdarzeń.

get_partition_properties

Pobierz właściwości określonej partycji.

Klucze w słowniku właściwości obejmują:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Wysyła partię danych zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli klasa EventHubProducerClient jest skonfigurowana do uruchamiania w trybie buforowanym, metoda będzie umieszczać zdarzenia w kolejce do buforu lokalnego i zwracać je. Producent wykona automatyczne wysyłanie w tle.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:

  • Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,

    następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, który następnie zostanie wywołany.

  • Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,

    następnie błąd zostanie zgłoszony domyślnie.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:

  • Jeśli nie można umieścić zdarzeń w kolejce w ramach danego limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  • Jeśli zdarzenia nie będą wysyłane po pomyślnym zapisie w kolejce, wywołanie zwrotne on_error zostanie wywołane.

W trybie buforowania wysyłanie partii pozostanie nienaruszone i wysłane jako pojedyncza jednostka. Partia nie zostanie ponownie rozmieszona. Może to spowodować nieefektywność wysyłania zdarzeń.

Jeśli wysyłasz ograniczoną listę zdarzeńData lub AmqpAnnotatedMessage i wiesz, że znajduje się ona w limicie rozmiaru ramki centrum zdarzeń, możesz wysłać je za pomocą wywołania send_batch . W przeciwnym razie użyj polecenia create_batch , aby utworzyć element EventDataBatch i dodać element EventData lub AmqpAnnotatedMessage do partii jeden po drugim, aż do limitu rozmiaru, a następnie wywołaj tę metodę, aby wysłać partię.

send_event

Wysyła dane zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli klasa EventHubProducerClient jest skonfigurowana do uruchamiania w trybie buforowanym, metoda będzie umieszczać zdarzenie w kolejce do buforu lokalnego i zwracać je. Producent wykona automatyczne przetwarzanie wsadowe i wysyłanie w tle.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób: * Jeśli wywołanie zwrotne on_error zostanie przekazane podczas tworzenia wystąpienia klienta producenta,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane i błędy będą obsługiwane w następujący sposób: * Jeśli zdarzenia nie będą w kolejce w ramach danego limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Zamknij klienta producenta bazowego połączenia AMQP i linków.

async close(*, flush: bool = True, **kwargs: Any) -> None

Parametry

flush
bool

Tylko tryb buforowany. W przypadku ustawienia wartości True zdarzenia w buforze będą wysyłane natychmiast. Wartość domyślna to True.

timeout
float lub None

Tylko tryb buforowany. Limit czasu zamknięcia producenta. Wartość domyślna to Brak, co oznacza brak limitu czasu.

Typ zwracany

Wyjątki

Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.

Przykłady

Zamknij program obsługi.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

Utwórz obiekt EventDataBatch o maksymalnym rozmiarze całej zawartości ograniczanej przez max_size_in_bytes.

Max_size_in_bytes nie powinna być większa niż maksymalny dozwolony rozmiar komunikatu zdefiniowany przez usługę.

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

Typ zwracany

Wyjątki

Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.

Przykłady

Tworzenie obiektu EventDataBatch w ograniczonym rozmiarze


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Tylko tryb buforowany. Opróżnij zdarzenia w buforze, które mają być wysyłane natychmiast, jeśli klient działa w trybie buforowym.

async flush(**kwargs: Any) -> None

Parametry

timeout
float lub None

Limit czasu opróżniania buforowanych zdarzeń, wartość domyślna to Brak, co oznacza brak limitu czasu.

Typ zwracany

Wyjątki

Jeśli producent nie opróżni buforu w danym przedziale czasu w trybie buforowym.

from_connection_string

Utwórz element EventHubProducerClient na podstawie parametry połączenia.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

Parametry

conn_str
str
Wymagane

Parametry połączenia centrum zdarzeń.

eventhub_name
str

Ścieżka określonego centrum zdarzeń do połączenia klienta.

buffered_mode
bool

Jeśli wartość True, klient producenta będzie zbierać zdarzenia w buforze, efektywnie wsadowy, a następnie publikować. Wartość domyślna to Fałsz.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Wywołanie zwrotne, które ma być wywoływane po pomyślnym opublikowaniu partii. Wywołanie zwrotne przyjmuje dwa parametry:

  • zdarzenia: lista zdarzeń, które zostały pomyślnie opublikowane

  • partition_id: identyfikator partycji, do którego zostały opublikowane zdarzenia na liście.

Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_success(events, partition_id). Jest to wymagane, gdy buffered_mode ma wartość True, jeśli buffered_mode ma wartość False.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Wywołanie zwrotne, które ma być wywoływane po opublikowaniu partii. Funkcja wywołania zwrotnego powinna być zdefiniowana w następujący sposób: on_error(events, partition_id, error), gdzie:

  • zdarzenia: lista zdarzeń, których nie można opublikować,

  • partition_id: identyfikator partycji, do którego podjęto próbę opublikowania zdarzeń na liście i

  • błąd: wyjątek związany z niepowodzeniem wysyłania.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:

  • Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,

    następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, które następnie zostaną wywołane.

  • Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,

    następnie domyślnie zostanie zgłoszony błąd.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:

  • Jeśli zdarzenia nie będą w kolejce w danym przekroczeniu limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  • Jeśli zdarzenia nie będą wysyłane po pomyślnym w kolejce, wywołanie zwrotne on_error zostanie wywołane.

max_buffer_length
int

Tylko tryb buforowany. Łączna liczba zdarzeń na partycję, które można buforować przed wyzwoleniem opróżnienia. Wartość domyślna to 1500 w trybie buforowym.

max_wait_time
Optional[float]

Tylko tryb buforowany. Czas oczekiwania na utworzenie partii z zdarzeniami w buforze przed opublikowaniem. Wartość domyślna to 1 w trybie buforowym.

logging_enable
bool

Czy dane wyjściowe dzienniki śledzenia sieci do rejestratora. Wartość domyślna to Fałsz.

http_proxy
dict

Ustawienia serwera proxy HTTP. Musi to być słownik z następującymi kluczami: "proxy_hostname" (wartość str) i "proxy_port" (wartość int). Ponadto mogą istnieć następujące klucze: "nazwa użytkownika", "hasło".

auth_timeout
float

Czas w sekundach oczekiwania na autoryzację tokenu przez usługę. Wartość domyślna to 60 sekund. Jeśli ustawiono wartość 0, nie zostanie wymuszony limit czasu od klienta.

user_agent
str

Jeśli zostanie określony, zostanie to dodane przed ciągiem agenta użytkownika.

retry_total
int

Całkowita liczba prób ponownego wykonania operacji, która zakończyła się niepowodzeniem po wystąpieniu błędu. Wartość domyślna to 3.

retry_backoff_factor
float

Współczynnik wycofywania stosowany między próbami po drugiej próbie (większość błędów jest usuwana natychmiast przez drugą próbę bez opóźnienia). W trybie stałym zasady ponawiania będą zawsze w stanie uśpienia dla elementu {backoff factor}. W trybie wykładniczym zasady ponawiania prób uśpią następujące elementy: {backoff factor} * (2 ** ({liczba ponownych prób} – 1)) sekund. Jeśli backoff_factor wynosi 0,1, ponawianie próby będzie spać dla [0.0s, 0.2s, 0.4s, ...] między ponownymi próbami. Wartość domyślna to 0,8.

retry_backoff_max
float

Maksymalny czas wolny od pracy. Wartość domyślna to 120 sekund (2 minuty).

retry_mode
str

Opóźnienie między próbami ponawiania prób. Obsługiwane wartości to "fixed" lub "exponential", gdzie wartość domyślna to "wykładnik".

idle_timeout
float

Przekroczenie limitu czasu w sekundach, po którym ten klient zamknie połączenie bazowe, jeśli nie istnieje żadne działanie. Domyślnie wartość to Brak, co oznacza, że klient nie zostanie zamknięty z powodu braku aktywności, chyba że zainicjowany przez usługę.

transport_type
TransportType

Typ protokołu transportu, który będzie używany do komunikacji z usługą Event Hubs. Wartość domyślna to TransportType.Amqp , w którym przypadku jest używany port 5671. Jeśli port 5671 jest niedostępny/zablokowany w środowisku sieciowym, można użyć protokołu TransportType.AmqpOverWebsocket , który używa portu 443 do komunikacji.

custom_endpoint_address
Optional[str]

Niestandardowy adres punktu końcowego używany do nawiązywania połączenia z usługą Event Hubs, umożliwiając kierowanie żądań sieciowych przez wszystkie bramy aplikacji lub inne ścieżki wymagane dla środowiska hosta. Wartość domyślna to Brak. Format będzie taki jak "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Jeśli port nie zostanie określony w custom_endpoint_address, domyślnie zostanie użyty port 443.

connection_verify
Optional[str]

Ścieżka do niestandardowego pliku CA_BUNDLE certyfikatu SSL używanego do uwierzytelniania tożsamości punktu końcowego połączenia. Wartość domyślna to None w tym przypadku certifi.where() będzie używany.

uamqp_transport
bool

Czy używać biblioteki uamqp jako podstawowego transportu. Wartość domyślna to False, a biblioteka Pure Python AMQP będzie używana jako podstawowy transport.

Typ zwracany

Wyjątki

Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.

Przykłady

Utwórz nowe wystąpienie klasy EventHubProducerClient z parametry połączenia.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Liczba zdarzeń buforowanych i oczekujących na opublikowanie dla danej partycji. Zwraca wartość None w trybie bez buforowania. UWAGA: bufor zdarzeń jest przetwarzany w tle coroutine, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu. W przypadku identyfikatora partycji, który nie ma zdarzeń buforowanych, 0 zostanie zwrócone niezależnie od tego, czy ten identyfikator partycji rzeczywiście istnieje w centrum zdarzeń.

get_buffered_event_count(partition_id: str) -> int | None

Parametry

partition_id
str
Wymagane

Identyfikator partycji docelowej.

Typ zwracany

int,

Wyjątki

Jeśli wystąpił błąd podczas opróżniania buforu, jeśli opróżnianie jest ustawione na wartość True lub zamknięcie bazowych połączeń AMQP w trybie buforowym.

get_eventhub_properties

Pobierz właściwości centrum zdarzeń.

Klucze w zwracanym słowniku obejmują:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (lista[str])

async get_eventhub_properties() -> Dict[str, Any]

Zwraca

Słownik zawierający informacje o centrum zdarzeń.

Typ zwracany

Wyjątki

get_partition_ids

Pobierz identyfikatory partycji centrum zdarzeń.

async get_partition_ids() -> List[str]

Zwraca

Lista identyfikatorów partycji.

Typ zwracany

Wyjątki

get_partition_properties

Pobierz właściwości określonej partycji.

Klucze w słowniku właściwości obejmują:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametry

partition_id
str
Wymagane

Identyfikator partycji docelowej.

Zwraca

Dykt właściwości partycji.

Typ zwracany

Wyjątki

send_batch

Wysyła partię danych zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli klasa EventHubProducerClient jest skonfigurowana do uruchamiania w trybie buforowanym, metoda będzie umieszczać zdarzenia w kolejce do buforu lokalnego i zwracać je. Producent wykona automatyczne wysyłanie w tle.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób:

  • Jeśli wywołanie zwrotne on_error jest przekazywane podczas tworzenia wystąpienia klienta producenta,

    następnie informacje o błędzie zostaną przekazane do wywołania zwrotnego on_error, który następnie zostanie wywołany.

  • Jeśli wywołanie zwrotne on_error nie jest przekazywane podczas tworzenia wystąpienia klienta,

    następnie błąd zostanie zgłoszony domyślnie.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane, a błędy będą obsługiwane w następujący sposób:

  • Jeśli nie można umieścić zdarzeń w kolejce w ramach danego limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  • Jeśli zdarzenia nie będą wysyłane po pomyślnym zapisie w kolejce, wywołanie zwrotne on_error zostanie wywołane.

W trybie buforowania wysyłanie partii pozostanie nienaruszone i wysłane jako pojedyncza jednostka. Partia nie zostanie ponownie rozmieszona. Może to spowodować nieefektywność wysyłania zdarzeń.

Jeśli wysyłasz ograniczoną listę zdarzeńData lub AmqpAnnotatedMessage i wiesz, że znajduje się ona w limicie rozmiaru ramki centrum zdarzeń, możesz wysłać je za pomocą wywołania send_batch . W przeciwnym razie użyj polecenia create_batch , aby utworzyć element EventDataBatch i dodać element EventData lub AmqpAnnotatedMessage do partii jeden po drugim, aż do limitu rozmiaru, a następnie wywołaj tę metodę, aby wysłać partię.

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parametry

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Wymagane

Obiekt EventDataBatch , który ma zostać wysłany lub lista zdarzeń, które mają być wysyłane w partii. Wszystkie zdarzenia EventData lub AmqpAnnotatedMessage na liście lub EventDataBatch zostaną wylądować na tej samej partycji.

timeout
float

Maksymalny czas oczekiwania na wysłanie danych zdarzenia w trybie niebuforowany lub maksymalny czas oczekiwania w kolejce danych zdarzenia do buforu w trybie buforowym. W trybie bez buforowania domyślny czas oczekiwania określony podczas tworzenia producenta zostanie użyty. W trybie buforowym domyślny czas oczekiwania to Brak.

partition_id
str

Określony identyfikator partycji do wysłania. Wartość domyślna to Brak, w którym przypadku usługa zostanie przypisana do wszystkich partycji przy użyciu działania okrężnego. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_id, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ samo zdarzenie EventDataBatch ma partition_id.

partition_key
str

W przypadku danego partition_key dane zdarzeń zostaną wysłane do określonej partycji centrum zdarzeń określonego przez usługę. Błąd TypeError zostanie zgłoszony, jeśli zostanie określony partition_key, a event_data_batch jest zdarzeniem EventDataBatch, ponieważ sama funkcja EventDataBatch ma partition_key. Jeśli zostaną podane zarówno partition_id, jak i partition_key, partition_id będą miały pierwszeństwo. OSTRZEŻENIE: Ustawienie partition_key wartości innej niż ciąg dla zdarzeń do wysłania jest niezalecane, ponieważ partition_key zostanie zignorowana przez usługę Centrum zdarzeń, a zdarzenia zostaną przypisane do wszystkich partycji przy użyciu działania okrężnego. Ponadto istnieją zestawy SDK do używania zdarzeń, które oczekują, że partition_key być tylko typem ciągu, mogą one nie przeanalizować wartości innej niż ciąg.

Typ zwracany

Wyjątki

Jeśli wartość określona przez parametr limitu czasu upłynął przed wysłaniem zdarzenia w trybie niebuforowym lub zdarzenia mogą zostać umieszczone w kolejce do buforowanego w trybie buforowym.

Przykłady

Asynchronicznie wysyła dane zdarzeń


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

Wysyła dane zdarzenia. Domyślnie metoda będzie blokować do momentu odebrania potwierdzenia lub limitu czasu operacji. Jeśli klasa EventHubProducerClient jest skonfigurowana do uruchamiania w trybie buforowanym, metoda będzie umieszczać zdarzenie w kolejce do buforu lokalnego i zwracać je. Producent wykona automatyczne przetwarzanie wsadowe i wysyłanie w tle.

Jeśli buffered_mode ma wartość False, on_error wywołanie zwrotne jest opcjonalne, a błędy będą obsługiwane w następujący sposób: * Jeśli wywołanie zwrotne on_error zostanie przekazane podczas tworzenia wystąpienia klienta producenta,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Jeśli buffered_mode ma wartość True, on_error wywołanie zwrotne jest wymagane i błędy będą obsługiwane w następujący sposób: * Jeśli zdarzenia nie będą w kolejce w ramach danego limitu czasu, zostanie zgłoszony błąd bezpośrednio.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parametry

event_data
Union[EventData, AmqpAnnotatedMessage]
Wymagane

Obiekt EventData do wysłania.

timeout
float

Maksymalny czas oczekiwania na wysłanie danych zdarzenia w trybie bez buforowania lub maksymalny czas oczekiwania w kolejce danych zdarzenia do buforu w trybie buforowania. W trybie bez buforowania zostanie użyty domyślny czas oczekiwania określony podczas tworzenia producenta. W trybie buforowym domyślny czas oczekiwania to Brak.

partition_id
str

Określony identyfikator partycji do wysłania. Wartość domyślna to Brak. W tym przypadku usługa zostanie przypisana do wszystkich partycji przy użyciu działania okrężnego. Błąd TypeError zostanie zgłoszony, jeśli określono partition_id i event_data_batch jest zdarzeniem EventDataBatch, ponieważ sama klasa EventDataBatch ma partition_id.

partition_key
str

W przypadku danej partition_key dane zdarzeń będą wysyłane do określonej partycji centrum zdarzeń określonego przez usługę. Błąd TypeError zostanie zgłoszony, jeśli określono partition_key i event_data_batch jest zdarzeniem EventDataBatch , ponieważ sama klasa EventDataBatch ma partition_key. Jeśli podano zarówno partition_id, jak i partition_key, pierwszeństwo będzie mieć partition_id. OSTRZEŻENIE: Ustawienie partition_key wartości innej niż ciąg dla zdarzeń do wysłania jest niezalecane, ponieważ partition_key zostanie zignorowana przez usługę Centrum zdarzeń, a zdarzenia zostaną przypisane do wszystkich partycji przy użyciu działania okrężnego. Ponadto istnieją zestawy SDK do używania zdarzeń, które oczekują, że partition_key być tylko typem ciągu, mogą one nie przeanalizować wartości innej niż ciąg.

Typ zwracany

Wyjątki

Jeśli wartość określona przez parametr limitu czasu upłynie przed wysłaniem zdarzenia w trybie bez buforowania lub zdarzenia nie mogą być umieszczone w kolejce do buforowanego w trybie buforowania.

Atrybuty

total_buffered_event_count

Łączna liczba zdarzeń, które są obecnie buforowane i oczekujące na opublikowanie, we wszystkich partycjach. Zwraca wartość None w trybie bez buforowania. UWAGA: bufor zdarzeń jest przetwarzany w tle coroutine, dlatego liczba zdarzeń w buforze zgłoszonym przez ten interfejs API powinna być uznawana tylko za przybliżenie i jest zalecana tylko do użycia w debugowaniu.

Typ zwracany

int,