Sdílet prostřednictvím


EventHubProducerClient Třída

Třída EventHubProducerClient definuje rozhraní vysoké úrovně pro odesílání událostí do služby Azure Event Hubs.

Dědičnost
azure.eventhub._client_base.ClientBase
EventHubProducerClient

Konstruktor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: Literal[False] = False, **kwargs: Any)

Parametry

fully_qualified_namespace
str
Vyžadováno

Plně kvalifikovaný název hostitele pro obor názvů služby Event Hubs. Je pravděpodobné, že se bude podobat .servicebus.windows.net

eventhub_name
str
Vyžadováno

Cesta ke konkrétnímu centru událostí, ke kterému se má klient připojit.

credential
TokenCredential nebo AzureSasCredential nebo AzureNamedKeyCredential
Vyžadováno

Objekt přihlašovacích údajů používaný k ověřování, který implementuje konkrétní rozhraní pro získávání tokenů. Přijímá EventHubSharedKeyCredentialobjekty přihlašovacích údajů nebo vygenerované knihovnou azure-identity a objekty, které implementují metodu *get_token(self, scopes).

buffered_mode
bool

Pokud je true, bude klient producenta shromažďovat události ve vyrovnávací paměti, efektivně dávkově a pak publikovat. Výchozí hodnota je False.

buffer_concurrency
<xref:ThreadPoolExecutor> nebo int nebo None

ThreadPoolExecutor, který se má použít pro publikování událostí nebo počet pracovních procesů pro ThreadPoolExecutor. Výchozí hodnota je None a vytvoří se ThreadPoolExecutor s výchozím počtem pracovních procesů pro každou z nich. https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

Zpětné volání, které se má volat, jakmile byla dávka úspěšně publikována. Zpětné volání má dva parametry:

  • events: Seznam událostí, které byly úspěšně publikovány

  • partition_id: ID oddílu, do kterého byly události v seznamu publikovány.

Funkce zpětného volání by měla být definována takto: on_success(events, partition_id). Vyžaduje se, pokud buffered_mode má hodnotu True, zatímco volitelná, pokud je buffered_mode false.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

Zpětné volání, které se má volat, jakmile se dávka nepublikuje. Funkce zpětného volání by měla být definovaná takto: on_error(události, partition_id, chyba), kde:

  • events: Seznam událostí, které se nepodařilo publikovat,

  • partition_id: ID oddílu, do kterého se události v seznamu pokusily publikovat, a

  • error: Výjimka související se selháním odesílání.

Pokud je buffered_mode False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se během vytváření instancí klienta producenta předá zpětné volání on_error ,

    pak se informace o chybě předají zpětnému volání on_error , které se pak zavolá.

  • Pokud se během vytváření instancí klienta nepředá zpětné volání on_error,

    pak bude ve výchozím nastavení vyvolána chyba.

Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.

  • Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .

max_buffer_length
int

Pouze režim vyrovnávací paměti. Celkový počet událostí na oddíl, které je možné ukládat do vyrovnávací paměti před aktivací vyprázdnění. Výchozí hodnota je 1500 v režimu vyrovnávací paměti.

max_wait_time
Optional[float]

Pouze režim vyrovnávací paměti. Doba čekání na sestavení dávky s událostmi ve vyrovnávací paměti před publikováním. Výchozí hodnota je 1 v režimu vyrovnávací paměti.

logging_enable
bool

Zda se mají protokoly trasování sítě vypisovat do protokolovacího nástroje. Výchozí hodnota je False.

auth_timeout
float

Doba v sekundách čekání na autorizaci tokenu službou. Výchozí hodnota je 60 sekund. Pokud je nastavená hodnota 0, nebude z klienta vynucován žádný časový limit.

user_agent
str

Pokud je zadaný, přidá se před řetězec uživatelského agenta.

retry_total
int

Celkový počet pokusů o opakování neúspěšné operace při výskytu chyby Výchozí hodnota je 3.

retry_backoff_factor
float

Faktor backoff, který se použije mezi pokusy po druhém pokusu (většina chyb se vyřeší okamžitě druhým pokusem bez zpoždění). V režimu pevného režimu budou zásady opakování vždy v režimu spánku pro {backoff factor}. V exponenciálním režimu budou zásady opakování v režimu spánku po dobu: {backoff factor} * (2 ** ({počet celkových opakování} – 1)) sekund. Pokud je backoff_factor 0,1, pak opakování přejde do režimu spánku po dobu [0,0s, 0,2s, 0,4s, ...] mezi opakovanými pokusy. Výchozí hodnota je 0,8.

retry_backoff_max
float

Maximální doba návratu. Výchozí hodnota je 120 sekund (2 minuty).

retry_mode
str

Chování zpoždění mezi opakovanými pokusy. Podporované hodnoty jsou pevné nebo exponenciální, kde výchozí hodnota je exponenciální.

idle_timeout
float

Časový limit v sekundách, po jehož uplynutí klient ukončí základní připojení, pokud nedojde k žádné aktivitě. Ve výchozím nastavení je hodnota None, což znamená, že klient se kvůli nečinnosti nevystaví, pokud ji neiniciuje služba.

transport_type
TransportType

Typ přenosového protokolu, který se použije ke komunikaci se službou Event Hubs. Výchozí hodnota je TransportType.Amqp , v takovém případě se používá port 5671. Pokud je port 5671 v síťovém prostředí nedostupný nebo blokovaný, místo toho by se mohl použít transportType.AmqpOverWebsocket , který používá pro komunikaci port 443.

http_proxy
Dict

Nastavení proxy serveru HTTP. Musí se jednat o slovník s následujícími klíči: "proxy_hostname" (hodnota str) a "proxy_port" (hodnota int). Kromě toho můžou být k dispozici také následující klíče: uživatelské jméno, heslo.

custom_endpoint_address
Optional[str]

Adresa vlastního koncového bodu, která se má použít pro navázání připojení ke službě Event Hubs, což umožňuje směrování síťových požadavků přes všechny aplikační brány nebo jiné cesty potřebné pro hostitelské prostředí. Výchozí hodnota je Žádný. Formát bude vypadat takto: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Pokud port není v custom_endpoint_address zadaný, použije se ve výchozím nastavení port 443.

connection_verify
Optional[str]

Cesta k vlastnímu souboru CA_BUNDLE certifikátu SSL, který se používá k ověření identity koncového bodu připojení. Výchozí hodnota je Žádná, v takovém případě se použije certifi.where().

uamqp_transport
bool

Zda se má jako podkladový přenos použít knihovna uamqp . Výchozí hodnota je False a jako podkladový přenos se použije knihovna AMQP v Čistém Pythonu.

socket_timeout
float

Čas v sekundách, kdy by měl podkladový soket na připojení čekat při odesílání a příjmu dat před vypršením časového limitu. Výchozí hodnota je 0,2 pro TransportType.Amqp a 1 pro TransportType.AmqpOverWebsocket. Pokud dochází k chybám EventHubsConnectionError kvůli vypršení časového limitu zápisu, může být potřeba předat větší než výchozí hodnotu. To platí pro pokročilé scénáře použití a obvykle by měla stačit výchozí hodnota.

Příklady

Vytvořte novou instanci EventHubProducerClient.


   import os
   from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   producer = EventHubProducerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,  # EventHub name should be specified if it doesn't show up in connection string.
       credential=credential
   )

Metody

close

Zavřete základní připojení a propojení AMQP klienta producenta.

create_batch

Vytvořte objekt EventDataBatch s maximální velikostí veškerého obsahu, který je omezen max_size_in_bytes.

Max_size_in_bytes nesmí být větší než maximální povolená velikost zpráv definovaná službou.

flush

Pouze režim vyrovnávací paměti. Vyprázdnění událostí ve vyrovnávací paměti, které se mají okamžitě odeslat, pokud klient pracuje v režimu vyrovnávací paměti.

from_connection_string

Vytvořte EventHubProducerClient ze připojovací řetězec.

get_buffered_event_count

Počet událostí, které se ukládají do vyrovnávací paměti a čekají na publikování pro daný oddíl. Vrátí hodnotu None v režimu bez vyrovnávací paměti. POZNÁMKA: Vyrovnávací paměť událostí se zpracovává ve vlákně na pozadí, proto počet událostí ve vyrovnávací paměti hlášené tímto rozhraním API by měl být považován pouze za aproximaci a doporučuje se pouze pro použití při ladění. V případě ID oddílu, který nemá žádné události ve vyrovnávací paměti, se vrátí hodnota 0 bez ohledu na to, jestli id oddílu ve skutečnosti v centru událostí existuje.

get_eventhub_properties

Získejte vlastnosti centra událostí.

Mezi klíče ve vráceném slovníku patří:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Získejte ID oddílů centra událostí.

get_partition_properties

Získejte vlastnosti zadaného oddílu.

Mezi klíče ve slovníku vlastností patří:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Odešle dávku dat událostí. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda se pokusí zařadí události do vyrovnávací paměti v daném čase, pokud je zadaný a vrátí se. Producent provede automatické odesílání na pozadí v režimu vyrovnávací paměti.

Pokud je buffered_mode False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se během vytváření instancí klienta producenta předá zpětné volání on_error ,

    pak se informace o chybě předají zpětnému volání on_error , které se pak zavolá.

  • Pokud se během vytváření instancí klienta nepředá zpětné volání on_error,

    pak bude ve výchozím nastavení vyvolána chyba.

Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.

  • Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .

V režimu vyrovnávací paměti zůstane odeslání dávky nedotčené a odesláno jako jedna jednotka. Uspořádání dávky nebude přeuspořádané. To může vést k neefektivitě odesílání událostí.

Pokud odesíláte konečný seznam EventData nebo AmqpAnnotatedMessage a víte, že je v limitu velikosti rámce centra událostí, můžete je poslat s send_batch voláním. V opačném případě použijte create_batch k vytvoření EventDataBatch a přidejte buď EventData nebo AmqpAnnotatedMessage do dávky jeden po druhém až do limitu velikosti, a pak voláním této metody odešlete dávku.

send_event

Odešle data události. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda se pokusí zařadí události do vyrovnávací paměti v daném čase, pokud je zadaný a vrátí se. Producent provede automatické odesílání na pozadí v režimu vyrovnávací paměti.

Pokud je buffered_mode false, on_error zpětné volání je volitelné a chyby se budou zpracovávat takto: * Pokud se během vytváření instance klienta producenta předá zpětné volání on_error ,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem: * Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Zavřete základní připojení a propojení AMQP klienta producenta.

close(*, flush: bool = True, **kwargs: Any) -> None

Parametry

flush
bool

Pouze režim vyrovnávací paměti. Pokud je nastavená hodnota True, události ve vyrovnávací paměti se odešlou okamžitě. Výchozí hodnota je Pravda.

timeout
float nebo None

Pouze režim vyrovnávací paměti. Vypršení časového limitu pro zavření producenta Výchozí hodnota je Žádná, což znamená žádný časový limit.

Návratový typ

Výjimky

Pokud došlo k chybě při vyprázdnění vyrovnávací paměti, pokud je vyprázdnění nastaveno na Hodnotu True nebo zavření podkladových připojení AMQP v režimu vyrovnávací paměti.

Příklady

Zavřete klienta.


   import os
   from azure.eventhub import EventHubProducerClient, EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = producer.create_batch()

       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # EventDataBatch object reaches max_size.
               # New EventDataBatch object can be created here to send more data
               break

       producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       producer.close()

create_batch

Vytvořte objekt EventDataBatch s maximální velikostí veškerého obsahu, který je omezen max_size_in_bytes.

Max_size_in_bytes nesmí být větší než maximální povolená velikost zpráv definovaná službou.

create_batch(**kwargs: Any) -> EventDataBatch

Návratový typ

Výjimky

Pokud došlo k chybě při vyprázdnění vyrovnávací paměti, pokud je vyprázdnění nastaveno na Hodnotu True nebo zavření podkladových připojení AMQP v režimu vyrovnávací paměti.

Příklady

Vytvoření objektu EventDataBatch v omezené velikosti


       event_data_batch = producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Pouze režim vyrovnávací paměti. Vyprázdnění událostí ve vyrovnávací paměti, které se mají okamžitě odeslat, pokud klient pracuje v režimu vyrovnávací paměti.

flush(**kwargs: Any) -> None

Parametry

timeout
Optional[float]

Časový limit pro vyprázdnění událostí ve vyrovnávací paměti, výchozí hodnota je Žádná, což znamená žádný časový limit.

Návratový typ

Výjimky

Pokud se producentovi nepodaří vyprázdnit vyrovnávací paměť v daném časovém limitu v režimu vyrovnávací paměti.

from_connection_string

Vytvořte EventHubProducerClient ze připojovací řetězec.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: Literal[False] = False, **kwargs: Any) -> EventHubProducerClient

Parametry

conn_str
str
Vyžadováno

Připojovací řetězec centra událostí.

eventhub_name
str

Cesta ke konkrétnímu centru událostí, ke kterému se má klient připojit.

buffered_mode
bool

Pokud je true, bude klient producenta shromažďovat události ve vyrovnávací paměti, efektivně dávkově a pak publikovat. Výchozí hodnota je False.

buffer_concurrency
<xref:ThreadPoolExecutor> nebo int nebo None

ThreadPoolExecutor, který se má použít pro publikování událostí nebo počet pracovních procesů pro ThreadPoolExecutor. Výchozí hodnota je None a vytvoří se ThreadPoolExecutor s výchozím počtem pracovních procesů pro každou z nich. https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], None]]

Zpětné volání, které se má volat, jakmile byla dávka úspěšně publikována. Zpětné volání má dva parametry:

  • events: Seznam událostí, které byly úspěšně publikovány

  • partition_id: ID oddílu, do kterého byly události v seznamu publikovány.

Funkce zpětného volání by měla být definována takto: on_success(events, partition_id). Vyžaduje se , pokud má buffered_mode hodnotu True, zatímco volitelná , pokud je buffered_mode false.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], None]]

Zpětné volání, které se má volat, jakmile se dávka nepublikuje. Vyžaduje se, pokud je v buffered_mode hodnota True, zatímco volitelná , pokud buffered_mode false. Funkce zpětného volání by měla být definovaná takto: on_error(události, partition_id, chyba), kde:

  • events: Seznam událostí, které se nepodařilo publikovat,

  • partition_id: ID oddílu, do kterého se události v seznamu pokusily publikovat, a

  • error: Výjimka související se selháním odesílání.

Pokud je buffered_mode False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se během vytváření instancí klienta producenta předá zpětné volání on_error ,

    pak se informace o chybě předají zpětnému volání on_error , které se pak zavolá.

  • Pokud se během vytváření instancí klienta nepředá zpětné volání on_error,

    pak bude ve výchozím nastavení vyvolána chyba.

Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.

  • Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .

max_buffer_length
int

Pouze režim vyrovnávací paměti. Celkový počet událostí na oddíl, které je možné ukládat do vyrovnávací paměti před aktivací vyprázdnění. Výchozí hodnota je 1500 v režimu vyrovnávací paměti.

max_wait_time
Optional[float]

Pouze režim vyrovnávací paměti. Doba čekání na sestavení dávky s událostmi ve vyrovnávací paměti před publikováním. Výchozí hodnota je 1 v režimu vyrovnávací paměti.

logging_enable
bool

Zda se mají protokoly trasování sítě vypisovat do protokolovacího nástroje. Výchozí hodnota je False.

http_proxy
Dict

Nastavení proxy serveru HTTP. Musí se jednat o slovník s následujícími klíči: "proxy_hostname" (hodnota str) a "proxy_port" (hodnota int). Kromě toho můžou být k dispozici také následující klíče: uživatelské jméno, heslo.

auth_timeout
float

Doba v sekundách čekání na autorizaci tokenu službou. Výchozí hodnota je 60 sekund. Pokud je nastavená hodnota 0, nebude z klienta vynucován žádný časový limit.

user_agent
str

Pokud je zadaný, přidá se před řetězec uživatelského agenta.

retry_total
int

Celkový počet pokusů o opakování neúspěšné operace při výskytu chyby Výchozí hodnota je 3.

retry_backoff_factor
float

Faktor backoff, který se použije mezi pokusy po druhém pokusu (většina chyb se vyřeší okamžitě druhým pokusem bez zpoždění). V režimu pevného režimu budou zásady opakování vždy v režimu spánku pro {backoff factor}. V exponenciálním režimu budou zásady opakování v režimu spánku po dobu: {backoff factor} * (2 ** ({počet celkových opakování} – 1)) sekund. Pokud je backoff_factor 0,1, pak opakování přejde do režimu spánku po dobu [0,0s, 0,2s, 0,4s, ...] mezi opakovanými pokusy. Výchozí hodnota je 0,8.

retry_backoff_max
float

Maximální doba návratu. Výchozí hodnota je 120 sekund (2 minuty).

retry_mode
str

Chování zpoždění mezi opakovanými pokusy. Podporované hodnoty jsou pevné nebo exponenciální, kde výchozí hodnota je exponenciální.

idle_timeout
float

Časový limit v sekundách, po jehož uplynutí klient ukončí základní připojení, pokud nedojde k žádné aktivitě. Ve výchozím nastavení je hodnota None, což znamená, že klient se kvůli nečinnosti nevystaví, pokud ji neiniciuje služba.

transport_type
TransportType

Typ přenosového protokolu, který se použije ke komunikaci se službou Event Hubs. Výchozí hodnota je TransportType.Amqp , v takovém případě se používá port 5671. Pokud je port 5671 v síťovém prostředí nedostupný nebo blokovaný, místo toho by se mohl použít transportType.AmqpOverWebsocket , který používá pro komunikaci port 443.

http_proxy

Nastavení proxy serveru HTTP. Musí se jednat o slovník s následujícími klíči: "proxy_hostname" (hodnota str) a "proxy_port" (hodnota int). Kromě toho můžou být k dispozici také následující klíče: uživatelské jméno, heslo.

custom_endpoint_address
Optional[str]

Adresa vlastního koncového bodu, která se má použít pro navázání připojení ke službě Event Hubs, což umožňuje směrování síťových požadavků přes všechny aplikační brány nebo jiné cesty potřebné pro hostitelské prostředí. Výchozí hodnota je Žádný. Formát bude vypadat takto: "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Pokud port není v custom_endpoint_address zadaný, použije se ve výchozím nastavení port 443.

connection_verify
Optional[str]

Cesta k vlastnímu souboru CA_BUNDLE certifikátu SSL, který se používá k ověření identity koncového bodu připojení. Výchozí hodnota je Žádná, v takovém případě se použije certifi.where().

uamqp_transport
bool

Zda se má jako podkladový přenos použít knihovna uamqp . Výchozí hodnota je False a jako podkladový přenos se použije knihovna AMQP v Čistém Pythonu.

Návratový typ

Výjimky

Pokud došlo k chybě při vyprázdnění vyrovnávací paměti, pokud je vyprázdnění nastaveno na Hodnotu True nebo zavření podkladových připojení AMQP v režimu vyrovnávací paměti.

Příklady

Vytvořte novou instanci EventHubProducerClient z připojovací řetězec.


   import os
   from azure.eventhub import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Počet událostí, které se ukládají do vyrovnávací paměti a čekají na publikování pro daný oddíl. Vrátí hodnotu None v režimu bez vyrovnávací paměti. POZNÁMKA: Vyrovnávací paměť událostí se zpracovává ve vlákně na pozadí, proto počet událostí ve vyrovnávací paměti hlášené tímto rozhraním API by měl být považován pouze za aproximaci a doporučuje se pouze pro použití při ladění. V případě ID oddílu, který nemá žádné události ve vyrovnávací paměti, se vrátí hodnota 0 bez ohledu na to, jestli id oddílu ve skutečnosti v centru událostí existuje.

get_buffered_event_count(partition_id: str) -> int | None

Parametry

partition_id
str
Vyžadováno

ID cílového oddílu.

Návratový typ

int,

Výjimky

Pokud došlo k chybě při vyprázdnění vyrovnávací paměti, pokud je vyprázdnění nastaveno na Hodnotu True nebo zavření podkladových připojení AMQP v režimu vyrovnávací paměti.

get_eventhub_properties

Získejte vlastnosti centra událostí.

Mezi klíče ve vráceném slovníku patří:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Návraty

Slovník obsahující vlastnosti eventhubu.

Návratový typ

Výjimky

get_partition_ids

Získejte ID oddílů centra událostí.

get_partition_ids() -> List[str]

Návraty

Seznam ID oddílů.

Návratový typ

Výjimky

get_partition_properties

Získejte vlastnosti zadaného oddílu.

Mezi klíče ve slovníku vlastností patří:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametry

partition_id
str
Vyžadováno

ID cílového oddílu.

Návraty

Slovník vlastností oddílu.

Návratový typ

Výjimky

send_batch

Odešle dávku dat událostí. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda se pokusí zařadí události do vyrovnávací paměti v daném čase, pokud je zadaný a vrátí se. Producent provede automatické odesílání na pozadí v režimu vyrovnávací paměti.

Pokud je buffered_mode False, on_error zpětné volání je volitelné a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se během vytváření instancí klienta producenta předá zpětné volání on_error ,

    pak se informace o chybě předají zpětnému volání on_error , které se pak zavolá.

  • Pokud se během vytváření instancí klienta nepředá zpětné volání on_error,

    pak bude ve výchozím nastavení vyvolána chyba.

Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem:

  • Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.

  • Pokud se události po úspěšném zařazení do fronty nepodaří odeslat, bude volána zpětná volání on_error .

V režimu vyrovnávací paměti zůstane odeslání dávky nedotčené a odesláno jako jedna jednotka. Uspořádání dávky nebude přeuspořádané. To může vést k neefektivitě odesílání událostí.

Pokud odesíláte konečný seznam EventData nebo AmqpAnnotatedMessage a víte, že je v limitu velikosti rámce centra událostí, můžete je poslat s send_batch voláním. V opačném případě použijte create_batch k vytvoření EventDataBatch a přidejte buď EventData nebo AmqpAnnotatedMessage do dávky jeden po druhém až do limitu velikosti, a pak voláním této metody odešlete dávku.

send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parametry

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Vyžadováno

Objekt EventDataBatch , který se má odeslat, nebo seznam EventData , který se má odeslat v dávce. Všechny EventData nebo AmqpAnnotatedMessage v seznamu nebo EventDataBatch přistanou do stejného oddílu.

timeout
float

Maximální doba čekání na odeslání dat události v režimu bez vyrovnávací paměti nebo maximální doba čekání na zařazení dat události do vyrovnávací paměti v režimu vyrovnávací paměti. V režimu bez vyrovnávací paměti se použije výchozí doba čekání zadaná při vytvoření producenta. Ve vyrovnávacím režimu je výchozí doba čekání Žádná.

partition_id
str

KONKRÉTNÍ ID oddílu, do které se má odeslat. Výchozí hodnota je Žádný. V takovém případě služba přiřadí všechny oddíly pomocí kruhového dotazování. Pokud je zadána partition_id a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná hodnota EventDataBatch má partition_id.

partition_key
str

S daným partition_key se data událostí odesílají do konkrétního oddílu centra událostí, o které rozhodne služba. Pokud je zadána partition_key a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná hodnota EventDataBatch má partition_key. Pokud jsou k dispozici partition_id i partition_key, bude mít přednost partition_id. UPOZORNĚNÍ: Nastavení partition_key neřetězcové hodnoty u odeslaných událostí se nedoporučuje, protože služba centra událostí bude partition_key ignorovat a události budou přiřazeny ke všem oddílům pomocí kruhového dotazování. Kromě toho existují sady SDK pro využívání událostí, které očekávají, že partition_key budou pouze typu řetězec. Nemusí se jim podařit analyzovat neřetězcovou hodnotu.

Návratový typ

Výjimky

Pokud hodnota určená parametrem časového limitu uplynou před odesláním události v režimu bez vyrovnávací paměti nebo události nelze zařadit do vyrovnávací paměti v režimu vyrovnávací paměti.

Příklady

Odesílá data událostí.


       with producer:
           event_data_batch = producer.create_batch()

           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # EventDataBatch object reaches max_size.
                   # New EventDataBatch object can be created here to send more data
                   break

           producer.send_batch(event_data_batch)

send_event

Odešle data události. Ve výchozím nastavení bude metoda blokovat, dokud nedojde k potvrzení nebo dokud nevypadne časový limit operace. Pokud je EventHubProducerClient nakonfigurovaný tak, aby běžel v režimu vyrovnávací paměti, metoda se pokusí zařadí události do vyrovnávací paměti v daném čase, pokud je zadaný a vrátí se. Producent provede automatické odesílání na pozadí v režimu vyrovnávací paměti.

Pokud je buffered_mode false, on_error zpětné volání je volitelné a chyby se budou zpracovávat takto: * Pokud se během vytváření instance klienta producenta předá zpětné volání on_error ,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Pokud má buffered_mode hodnotu True, vyžaduje se on_error zpětné volání a chyby se budou zpracovávat následujícím způsobem: * Pokud se událostem nepodaří vytvořit frontu v daném časovém limitu, bude přímo vyvolána chyba.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parametry

event_data
Union[EventData, AmqpAnnotatedMessage]
Vyžadováno

Objekt EventData , který se má odeslat.

timeout
float

Maximální doba čekání na odeslání dat události v režimu bez vyrovnávací paměti nebo maximální doba čekání na zařazení dat události do vyrovnávací paměti v režimu vyrovnávací paměti. V režimu bez vyrovnávací paměti se použije výchozí doba čekání zadaná při vytvoření producenta. Ve vyrovnávacím režimu je výchozí doba čekání Žádná.

partition_id
str

KONKRÉTNÍ ID oddílu, do které se má odeslat. Výchozí hodnota je Žádný. V takovém případě služba přiřadí všechny oddíly pomocí kruhového dotazování. Pokud je zadána partition_id a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná hodnota EventDataBatch má partition_id.

partition_key
str

S daným partition_key se data událostí odesílají do konkrétního oddílu centra událostí, o které rozhodne služba. Pokud je zadána partition_key a event_data_batch je EventDataBatch, vyvolá se chyba TypeError, protože samotná hodnota EventDataBatch má partition_key. Pokud jsou k dispozici partition_id i partition_key, bude mít přednost partition_id. UPOZORNĚNÍ: Nastavení partition_key neřetězcové hodnoty u odeslaných událostí se nedoporučuje, protože služba centra událostí bude partition_key ignorovat a události budou přiřazeny ke všem oddílům pomocí kruhového dotazování. Kromě toho existují sady SDK pro využívání událostí, které očekávají, že partition_key budou pouze typu řetězec. Nemusí se jim podařit analyzovat neřetězcovou hodnotu.

Návratový typ

Výjimky

Pokud hodnota určená parametrem časového limitu uplynou před odesláním události v režimu bez vyrovnávací paměti nebo události mohou být zapsány do vyrovnávací paměti v režimu vyrovnávací paměti.

Atributy

total_buffered_event_count

Celkový počet událostí, které jsou aktuálně ve vyrovnávací paměti a čekají na publikování, napříč všemi oddíly. Vrátí hodnotu None v režimu bez vyrovnávací paměti. POZNÁMKA: Vyrovnávací paměť událostí se zpracovává ve vlákně na pozadí, proto počet událostí ve vyrovnávací paměti hlášené tímto rozhraním API by měl být považován pouze za aproximaci a doporučuje se pouze pro použití při ladění.

Návratový typ

int,