Dela via


EventHubProducerClient Klass

Klassen EventHubProducerClient definierar ett gränssnitt på hög nivå för att skicka händelser till Azure Event Hubs-tjänsten.

Arv
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubProducerClient

Konstruktor

EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)

Parametrar

fully_qualified_namespace
str
Obligatorisk

Det fullständigt kvalificerade värdnamnet för Event Hubs-namnområdet. Detta kommer sannolikt att likna .servicebus.windows.net

eventhub_name
str
Obligatorisk

Sökvägen till den specifika händelsehubb som klienten ska anslutas till.

credential
AsyncTokenCredential eller AzureSasCredential eller AzureNamedKeyCredential
Obligatorisk

Det autentiseringsobjekt som används för autentisering som implementerar ett visst gränssnitt för att hämta token. Den accepterar , eller autentiseringsobjekt som genereras av azure-identity-biblioteket och objekt som implementerar EventHubSharedKeyCredentialmetoden *get_token(self, scopes).

buffered_mode
bool

Om sant samlar producentklienten in händelser i en buffert, effektivt batch och publicerar sedan. Standardvärdet är Falskt.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Återanropet anropas när en batch har publicerats. Återanropet tar två parametrar:

  • händelser: Listan över händelser som har publicerats

  • partition_id: Det partitions-ID som händelserna i listan har publicerats till.

Återanropsfunktionen ska definieras som: on_success(events, partition_id). Krävs när buffered_mode är Sant om det är valfritt om buffered_mode är Falskt.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Återanropet anropas när en batch inte har publicerats. Krävs när i buffered_mode är Sant medan valfritt om buffered_mode är Falskt. Återanropsfunktionen ska definieras som: on_error(events, partition_id, error), where:

  • händelser: Listan över händelser som inte kunde publiceras,

  • partition_id: Partitions-ID:t som händelserna i listan har försökt publiceras till och

  • fel: Undantaget som rör sändningsfelet.

Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt:

  • Om ett on_error återanrop skickas under producentklientens instansiering,

    sedan skickas felinformation till on_error återanrop, som sedan anropas.

  • Om ett on_error återanrop inte skickas under klient-instansieringen,

    då utlöses felet som standard.

Om buffered_mode är Sant krävs on_error återanrop och fel hanteras på följande sätt:

  • Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.

  • Om händelser inte kan skickas efter enqueuing anropas on_error återanrop.

max_buffer_length
int

Endast buffrat läge. Det totala antalet händelser per partition som kan bufferas innan en tömning utlöses. Standardvärdet är 1 500 i buffrat läge.

max_wait_time
Optional[float]

Endast buffrat läge. Hur lång tid det går att vänta på att en batch skapas med händelser i bufferten innan publiceringen. Standardvärdet är 1 i buffrat läge.

logging_enable
bool

Om nätverksspårningsloggar ska matas ut till loggarna. Standardvärdet är Falskt.

auth_timeout
float

Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.

user_agent
str

Om detta anges läggs detta till framför användaragentsträngen.

retry_total
int

Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3.

retry_backoff_factor
float

En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge kommer återförsöksprincipen alltid att vara i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} – 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.

retry_backoff_max
float

Maximal ledighetstid. Standardvärdet är 120 sekunder (2 minuter).

retry_mode
str

Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".

idle_timeout
float

Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.

transport_type
TransportType

Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standard är TransportType.Amqp i vilket fall port 5671 används. Om port 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.

http_proxy
dict

HTTP-proxyinställningar. Detta måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde). Dessutom kan följande nycklar också finnas: "användarnamn", "lösenord".

custom_endpoint_address
Optional[str]

Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle vara som "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används port 443 som standard.

connection_verify
Optional[str]

Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standard är Ingen i vilket fall certifi.where() kommer att användas.

uamqp_transport
bool

Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som underliggande transport.

socket_timeout
float

Tiden i sekunder som den underliggande socketen på anslutningen ska vänta när du skickar och tar emot data innan tidsgränsen nås. Standardvärdet är 0.2 för TransportType.Amqp och 1 för TransportType.AmqpOverWebsocket. Om EventHubsConnectionError-fel inträffar på grund av skrivtidsutgång kan ett större värde än standardvärdet behöva skickas in. Detta gäller för avancerade användningsscenarier och normalt bör standardvärdet vara tillräckligt.

Exempel

Skapa en ny instans av EventHubProducerClient.


   import os
   from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Metoder

close

Stäng den underliggande AMQP-anslutningen för producentklienten och länkarna.

create_batch

Skapa ett EventDataBatch-objekt med maxstorleken för allt innehåll som begränsas av max_size_in_bytes.

Max_size_in_bytes får inte vara större än den maximala tillåtna meddelandestorleken som definieras av tjänsten.

flush

Endast buffrat läge. Töm händelser i bufferten som ska skickas omedelbart om klienten arbetar i buffrat läge.

from_connection_string

Skapa en EventHubProducerClient från en anslutningssträng.

get_buffered_event_count

Antalet händelser som buffrats och väntar på att publiceras för en viss partition. Returnerar Ingen i icke-buffrat läge. Obs! Händelsebufferten bearbetas i en coroutine i bakgrunden. Därför bör antalet händelser i bufferten som rapporteras av detta API endast betraktas som en uppskattning och rekommenderas endast för felsökning. För ett partitions-ID som inte har några buffrade händelser returneras 0 oavsett om partitions-ID:t faktiskt finns i händelsehubben.

get_eventhub_properties

Hämta egenskaper för händelsehubben.

Nycklar i den returnerade ordlistan är:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Hämta partitions-ID:t för händelsehubben.

get_partition_properties

Hämta egenskaper för den angivna partitionen.

Nycklar i egenskapsordlistan är:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

send_batch

Skickar en batch med händelsedata. Som standard blockeras metoden tills bekräftelsen tas emot eller åtgärden överskrider tidsgränsen. Om EventHubProducerClient har konfigurerats för att köras i buffrat läge kommer -metoden att skicka händelserna till lokal buffert och returnera. Producenten skickar automatiskt i bakgrunden.

Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt:

  • Om ett on_error återanrop skickas under producentklientens instansiering,

    sedan skickas felinformation till on_error återanrop, som sedan anropas.

  • Om ett on_error återanrop inte skickas under klient-instansieringen,

    då utlöses felet som standard.

Om buffered_mode är Sant krävs on_error återanrop och fel hanteras på följande sätt:

  • Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.

  • Om händelser inte kan skickas efter enqueuing anropas on_error återanrop.

I buffrat läge förblir sändningen av en batch intakt och skickas som en enda enhet. Batchen ordnas inte om. Detta kan leda till ineffektivitet när händelser skickas.

Om du skickar en begränsad lista över EventData eller AmqpAnnotatedMessage och du vet att det är inom storleksgränsen för händelsehubbens ram kan du skicka dem med ett send_batch-anrop . Annars kan du använda create_batch för att skapa EventDataBatch och lägga till EventData eller AmqpAnnotatedMessage i batchen en i taget fram till storleksgränsen och anropa sedan den här metoden för att skicka ut batchen.

send_event

Skickar en händelsedata. Som standard blockeras metoden tills bekräftelsen tas emot eller åtgärden överskrider tidsgränsen. Om EventHubProducerClient har konfigurerats för att köras i buffrat läge kommer -metoden att skicka händelsen till lokal buffert och returnera. Producenten utför automatisk batchbearbetning och skickar i bakgrunden.

Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt: * Om ett on_error återanrop skickas under producentklientens instansiering,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Om buffered_mode är Sant krävs on_error återanrop och felen hanteras på följande sätt: * Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.

close

Stäng den underliggande AMQP-anslutningen för producentklienten och länkarna.

async close(*, flush: bool = True, **kwargs: Any) -> None

Parametrar

flush
bool

Endast buffrat läge. Om värdet är True skickas händelser i bufferten omedelbart. Standardvärdet är Sant.

timeout
float eller None

Endast buffrat läge. Timeout för att stänga producenten. Standardvärdet är Ingen, vilket innebär att det inte finns någon tidsgräns.

Returtyp

Undantag

Om ett fel uppstod vid tömning av bufferten om tömning är inställt på Sant eller stänger de underliggande AMQP-anslutningarna i buffrat läge.

Exempel

Stäng hanteraren.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   from azure.eventhub import EventData

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
   try:
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break
       await producer.send_batch(event_data_batch)
   finally:
       # Close down the producer handler.
       await producer.close()

create_batch

Skapa ett EventDataBatch-objekt med maxstorleken för allt innehåll som begränsas av max_size_in_bytes.

Max_size_in_bytes får inte vara större än den maximala tillåtna meddelandestorleken som definieras av tjänsten.

async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch

Returtyp

Undantag

Om ett fel uppstod vid tömning av bufferten om tömning är inställt på Sant eller stänger de underliggande AMQP-anslutningarna i buffrat läge.

Exempel

Skapa EventDataBatch-objekt med begränsad storlek


       from azure.eventhub import EventData
       event_data_batch = await producer.create_batch()
       while True:
           try:
               event_data_batch.add(EventData('Message inside EventBatchData'))
           except ValueError:
               # The EventDataBatch object reaches its max_size.
               # You can send the full EventDataBatch object and create a new one here.
               break

flush

Endast buffrat läge. Töm händelser i bufferten som ska skickas omedelbart om klienten arbetar i buffrat läge.

async flush(**kwargs: Any) -> None

Parametrar

timeout
float eller None

Timeout för att tömma de buffrade händelserna, standardvärdet är Ingen, vilket innebär att det inte finns någon tidsgräns.

Returtyp

Undantag

Om producenten inte kan tömma bufferten inom den angivna tidsgränsen i buffrat läge.

from_connection_string

Skapa en EventHubProducerClient från en anslutningssträng.

from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient

Parametrar

conn_str
str
Obligatorisk

Anslutningssträng för en händelsehubb.

eventhub_name
str

Sökvägen till den specifika händelsehubb som klienten ska anslutas till.

buffered_mode
bool

Om sant samlar producentklienten in händelser i en buffert, effektivt batch och publicerar sedan. Standardvärdet är Falskt.

on_success
Optional[Callable[[<xref:SendEventTypes>, Optional[str]], Awaitable[None]]]

Återanropet anropas när en batch har publicerats. Återanropet tar två parametrar:

  • händelser: Listan över händelser som har publicerats

  • partition_id: Det partitions-ID som händelserna i listan har publicerats till.

Återanropsfunktionen ska definieras som: on_success(events, partition_id). Det krävs när buffered_mode är Sant medan det är valfritt om buffered_mode är Falskt.

on_error
Optional[Callable[[<xref:SendEventTypes>, Optional[str], Exception], Awaitable[None]]]

Återanropet anropas när en batch inte har publicerats. Återanropsfunktionen ska definieras som: on_error(events, partition_id, error), where:

  • händelser: Listan över händelser som inte kunde publiceras,

  • partition_id: Partitions-ID:t som händelserna i listan har försökt publiceras till och

  • fel: Undantaget som rör sändningsfelet.

Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt:

  • Om ett on_error återanrop skickas under producentklientens instansiering,

    sedan skickas felinformation till on_error återanrop, som sedan anropas.

  • Om ett on_error återanrop inte skickas under klient-instansieringen,

    då utlöses felet som standard.

Om buffered_mode är Sant krävs on_error återanrop och fel hanteras på följande sätt:

  • Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.

  • Om händelser inte kan skickas efter enqueuing anropas on_error återanrop.

max_buffer_length
int

Endast buffrat läge. Det totala antalet händelser per partition som kan bufferas innan en tömning utlöses. Standardvärdet är 1 500 i buffrat läge.

max_wait_time
Optional[float]

Endast buffrat läge. Hur lång tid det går att vänta på att en batch skapas med händelser i bufferten innan publiceringen. Standardvärdet är 1 i buffrat läge.

logging_enable
bool

Om nätverksspårningsloggar ska matas ut till loggarna. Standardvärdet är Falskt.

http_proxy
dict

HTTP-proxyinställningar. Detta måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde). Dessutom kan följande nycklar också finnas: "användarnamn", "lösenord".

auth_timeout
float

Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.

user_agent
str

Om detta anges läggs detta till framför användaragentsträngen.

retry_total
int

Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3.

retry_backoff_factor
float

En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge kommer återförsöksprincipen alltid att vara i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} – 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.

retry_backoff_max
float

Maximal ledighetstid. Standardvärdet är 120 sekunder (2 minuter).

retry_mode
str

Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".

idle_timeout
float

Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.

transport_type
TransportType

Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standard är TransportType.Amqp i vilket fall port 5671 används. Om port 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.

custom_endpoint_address
Optional[str]

Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle vara som "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används port 443 som standard.

connection_verify
Optional[str]

Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standard är Ingen i vilket fall certifi.where() kommer att användas.

uamqp_transport
bool

Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som underliggande transport.

Returtyp

Undantag

Om ett fel uppstod vid tömning av bufferten om tömning är inställt på Sant eller stänger de underliggande AMQP-anslutningarna i buffrat läge.

Exempel

Skapa en ny instans av EventHubProducerClient från anslutningssträng.


   import os
   from azure.eventhub.aio import EventHubProducerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   producer = EventHubProducerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_buffered_event_count

Antalet händelser som buffrats och väntar på att publiceras för en viss partition. Returnerar Ingen i icke-buffrat läge. Obs! Händelsebufferten bearbetas i en coroutine i bakgrunden. Därför bör antalet händelser i bufferten som rapporteras av detta API endast betraktas som en uppskattning och rekommenderas endast för felsökning. För ett partitions-ID som inte har några buffrade händelser returneras 0 oavsett om partitions-ID:t faktiskt finns i händelsehubben.

get_buffered_event_count(partition_id: str) -> int | None

Parametrar

partition_id
str
Obligatorisk

Målpartitions-ID: t.

Returtyp

int,

Undantag

Om ett fel uppstod vid tömning av bufferten om tömning är inställt på Sant eller stänger de underliggande AMQP-anslutningarna i buffrat läge.

get_eventhub_properties

Hämta egenskaper för händelsehubben.

Nycklar i den returnerade ordlistan är:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Returer

En ordlista som innehåller information om händelsehubben.

Returtyp

Undantag

get_partition_ids

Hämta partitions-ID:t för händelsehubben.

async get_partition_ids() -> List[str]

Returer

En lista över partitions-ID:t.

Returtyp

Undantag

get_partition_properties

Hämta egenskaper för den angivna partitionen.

Nycklar i egenskapsordlistan är:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parametrar

partition_id
str
Obligatorisk

Målpartitions-ID: t.

Returer

En dikta med partitionsegenskaper.

Returtyp

Undantag

send_batch

Skickar en batch med händelsedata. Som standard blockeras metoden tills bekräftelsen tas emot eller åtgärden överskrider tidsgränsen. Om EventHubProducerClient har konfigurerats för att köras i buffrat läge kommer -metoden att skicka händelserna till lokal buffert och returnera. Producenten skickar automatiskt i bakgrunden.

Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt:

  • Om ett on_error återanrop skickas under producentklientens instansiering,

    sedan skickas felinformation till on_error återanrop, som sedan anropas.

  • Om ett on_error återanrop inte skickas under klient-instansieringen,

    då utlöses felet som standard.

Om buffered_mode är Sant krävs on_error återanrop och fel hanteras på följande sätt:

  • Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.

  • Om händelser inte kan skickas efter enqueuing anropas on_error återanrop.

I buffrat läge förblir sändningen av en batch intakt och skickas som en enda enhet. Batchen ordnas inte om. Detta kan leda till ineffektivitet när händelser skickas.

Om du skickar en begränsad lista över EventData eller AmqpAnnotatedMessage och du vet att det är inom storleksgränsen för händelsehubbens ram kan du skicka dem med ett send_batch-anrop . Annars kan du använda create_batch för att skapa EventDataBatch och lägga till EventData eller AmqpAnnotatedMessage i batchen en i taget fram till storleksgränsen och anropa sedan den här metoden för att skicka ut batchen.

async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None

Parametrar

event_data_batch
Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
Obligatorisk

EventDataBatch-objektet som ska skickas eller en lista över EventData som ska skickas i en batch. Alla EventData eller AmqpAnnotatedMessage i listan eller EventDataBatch hamnar på samma partition.

timeout
float

Den maximala väntetiden för att skicka händelsedata i icke-buffrat läge eller den maximala väntetiden för att skicka händelsedata till bufferten i buffrat läge. I icke-buffrat läge används den standardväntetid som angavs när producenten skapades. I buffrat läge är standardväntetiden Ingen.

partition_id
str

Det specifika partitions-ID som ska skickas till. Standard är Ingen, i vilket fall tjänsten tilldelar till alla partitioner med hjälp av resursallokering. En TypeError aktiveras om partition_id anges och event_data_batch är en EventDataBatch eftersom Själva EventDataBatch har partition_id.

partition_key
str

Med den angivna partition_key skickas händelsedata till en viss partition av händelsehubben som bestäms av tjänsten. En TypeError aktiveras om partition_key anges och event_data_batch är en EventDataBatch eftersom Själva EventDataBatch har partition_key. Om både partition_id och partition_key anges har partition_id företräde. VARNING! Det rekommenderas inte att ange partition_key för icke-strängvärde för de händelser som ska skickas eftersom partition_key ignoreras av Event Hub-tjänsten och händelser tilldelas till alla partitioner med resursallokering. Dessutom finns det SDK:er för användning av händelser som förväntar sig att partition_key endast är strängtyp. De kan misslyckas med att parsa värdet som inte är sträng.

Returtyp

Undantag

Om värdet som anges av timeout-parametern förflutit innan händelsen kan skickas i icke-buffrat läge eller om händelserna kan placeras i buffrat läge.

Exempel

Skickar händelsedata asynkront


       async with producer:
           event_data_batch = await producer.create_batch()
           while True:
               try:
                   event_data_batch.add(EventData('Message inside EventBatchData'))
               except ValueError:
                   # The EventDataBatch object reaches its max_size.
                   # You can send the full EventDataBatch object and create a new one here.
                   break
           await producer.send_batch(event_data_batch)

send_event

Skickar en händelsedata. Som standard blockeras metoden tills bekräftelsen tas emot eller åtgärden överskrider tidsgränsen. Om EventHubProducerClient har konfigurerats för att köras i buffrat läge kommer -metoden att skicka händelsen till lokal buffert och returnera. Producenten utför automatisk batchbearbetning och skickar i bakgrunden.

Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt: * Om ett on_error återanrop skickas under producentklientens instansiering,

       then error information will be passed to the *on_error* callback, which will then be called.

  * If an *on_error* callback is not passed in during client instantiation,

       then the error will be raised by default.

Om buffered_mode är Sant krävs on_error återanrop och felen hanteras på följande sätt: * Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.

  * If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None

Parametrar

event_data
Union[EventData, AmqpAnnotatedMessage]
Obligatorisk

EventData-objektet som ska skickas.

timeout
float

Den maximala väntetiden för att skicka händelsedata i icke-buffrat läge eller den maximala väntetiden för att skicka händelsedata till bufferten i buffrat läge. I icke-buffrat läge används den standardväntetid som angavs när producenten skapades. I buffrat läge är standardväntetiden Ingen.

partition_id
str

Det specifika partitions-ID som ska skickas till. Standard är Ingen, i vilket fall tjänsten tilldelar till alla partitioner med hjälp av resursallokering. En TypeError aktiveras om partition_id anges och event_data_batch är en EventDataBatch eftersom Själva EventDataBatch har partition_id.

partition_key
str

Med den angivna partition_key skickas händelsedata till en viss partition av händelsehubben som bestäms av tjänsten. En TypeError aktiveras om partition_key anges och event_data_batch är en EventDataBatch eftersom Själva EventDataBatch har partition_key. Om både partition_id och partition_key anges har partition_id företräde. VARNING! Det rekommenderas inte att ange partition_key för icke-strängvärde för de händelser som ska skickas eftersom partition_key ignoreras av Event Hub-tjänsten och händelser tilldelas till alla partitioner med resursallokering. Dessutom finns det SDK:er för användning av händelser som förväntar sig att partition_key endast är strängtyp. De kan misslyckas med att parsa värdet som inte är sträng.

Returtyp

Undantag

Om värdet som anges av timeout-parametern förflutit innan händelsen kan skickas i icke-buffrat läge eller om händelserna inte kan placeras i buffrat läge.

Attribut

total_buffered_event_count

Det totala antalet händelser som för närvarande buffrats och väntar på att publiceras över alla partitioner. Returnerar Ingen i icke-buffrat läge. Obs! Händelsebufferten bearbetas i en coroutine i bakgrunden. Därför bör antalet händelser i bufferten som rapporteras av detta API endast betraktas som en uppskattning och rekommenderas endast för felsökning.

Returtyp

int,