EventHubProducerClient Klass
Klassen EventHubProducerClient definierar ett gränssnitt på hög nivå för att skicka händelser till Azure Event Hubs-tjänsten.
- Arv
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubProducerClient
Konstruktor
EventHubProducerClient(fully_qualified_namespace: str, eventhub_name: str, credential: CredentialTypes, *, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, **kwargs: Any)
Parametrar
- fully_qualified_namespace
- str
Det fullständigt kvalificerade värdnamnet för Event Hubs-namnområdet. Detta kommer sannolikt att likna .servicebus.windows.net
- eventhub_name
- str
Sökvägen till den specifika händelsehubb som klienten ska anslutas till.
- credential
- AsyncTokenCredential eller AzureSasCredential eller AzureNamedKeyCredential
Det autentiseringsobjekt som används för autentisering som implementerar ett visst gränssnitt för att hämta token. Den accepterar , eller autentiseringsobjekt som genereras av azure-identity-biblioteket och objekt som implementerar EventHubSharedKeyCredentialmetoden *get_token(self, scopes).
- buffered_mode
- bool
Om sant samlar producentklienten in händelser i en buffert, effektivt batch och publicerar sedan. Standardvärdet är Falskt.
Återanropet anropas när en batch har publicerats. Återanropet tar två parametrar:
händelser: Listan över händelser som har publicerats
partition_id: Det partitions-ID som händelserna i listan har publicerats till.
Återanropsfunktionen ska definieras som: on_success(events, partition_id). Krävs när buffered_mode är Sant om det är valfritt om buffered_mode är Falskt.
Återanropet anropas när en batch inte har publicerats. Krävs när i buffered_mode är Sant medan valfritt om buffered_mode är Falskt. Återanropsfunktionen ska definieras som: on_error(events, partition_id, error), where:
händelser: Listan över händelser som inte kunde publiceras,
partition_id: Partitions-ID:t som händelserna i listan har försökt publiceras till och
fel: Undantaget som rör sändningsfelet.
Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt:
Om ett on_error återanrop skickas under producentklientens instansiering,
sedan skickas felinformation till on_error återanrop, som sedan anropas.
Om ett on_error återanrop inte skickas under klient-instansieringen,
då utlöses felet som standard.
Om buffered_mode är Sant krävs on_error återanrop och fel hanteras på följande sätt:
Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.
Om händelser inte kan skickas efter enqueuing anropas on_error återanrop.
- max_buffer_length
- int
Endast buffrat läge. Det totala antalet händelser per partition som kan bufferas innan en tömning utlöses. Standardvärdet är 1 500 i buffrat läge.
Endast buffrat läge. Hur lång tid det går att vänta på att en batch skapas med händelser i bufferten innan publiceringen. Standardvärdet är 1 i buffrat läge.
- logging_enable
- bool
Om nätverksspårningsloggar ska matas ut till loggarna. Standardvärdet är Falskt.
- auth_timeout
- float
Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.
- user_agent
- str
Om detta anges läggs detta till framför användaragentsträngen.
- retry_total
- int
Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3.
- retry_backoff_factor
- float
En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge kommer återförsöksprincipen alltid att vara i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} – 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.
- retry_backoff_max
- float
Maximal ledighetstid. Standardvärdet är 120 sekunder (2 minuter).
- retry_mode
- str
Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".
- idle_timeout
- float
Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.
- transport_type
- TransportType
Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standard är TransportType.Amqp i vilket fall port 5671 används. Om port 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.
- http_proxy
- dict
HTTP-proxyinställningar. Detta måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde). Dessutom kan följande nycklar också finnas: "användarnamn", "lösenord".
Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle vara som "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används port 443 som standard.
Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standard är Ingen i vilket fall certifi.where() kommer att användas.
- uamqp_transport
- bool
Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som underliggande transport.
- socket_timeout
- float
Tiden i sekunder som den underliggande socketen på anslutningen ska vänta när du skickar och tar emot data innan tidsgränsen nås. Standardvärdet är 0.2 för TransportType.Amqp och 1 för TransportType.AmqpOverWebsocket. Om EventHubsConnectionError-fel inträffar på grund av skrivtidsutgång kan ett större värde än standardvärdet behöva skickas in. Detta gäller för avancerade användningsscenarier och normalt bör standardvärdet vara tillräckligt.
Exempel
Skapa en ny instans av EventHubProducerClient.
import os
from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
producer = EventHubProducerClient(fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Metoder
close |
Stäng den underliggande AMQP-anslutningen för producentklienten och länkarna. |
create_batch |
Skapa ett EventDataBatch-objekt med maxstorleken för allt innehåll som begränsas av max_size_in_bytes. Max_size_in_bytes får inte vara större än den maximala tillåtna meddelandestorleken som definieras av tjänsten. |
flush |
Endast buffrat läge. Töm händelser i bufferten som ska skickas omedelbart om klienten arbetar i buffrat läge. |
from_connection_string |
Skapa en EventHubProducerClient från en anslutningssträng. |
get_buffered_event_count |
Antalet händelser som buffrats och väntar på att publiceras för en viss partition. Returnerar Ingen i icke-buffrat läge. Obs! Händelsebufferten bearbetas i en coroutine i bakgrunden. Därför bör antalet händelser i bufferten som rapporteras av detta API endast betraktas som en uppskattning och rekommenderas endast för felsökning. För ett partitions-ID som inte har några buffrade händelser returneras 0 oavsett om partitions-ID:t faktiskt finns i händelsehubben. |
get_eventhub_properties |
Hämta egenskaper för händelsehubben. Nycklar i den returnerade ordlistan är:
|
get_partition_ids |
Hämta partitions-ID:t för händelsehubben. |
get_partition_properties |
Hämta egenskaper för den angivna partitionen. Nycklar i egenskapsordlistan är:
|
send_batch |
Skickar en batch med händelsedata. Som standard blockeras metoden tills bekräftelsen tas emot eller åtgärden överskrider tidsgränsen. Om EventHubProducerClient har konfigurerats för att köras i buffrat läge kommer -metoden att skicka händelserna till lokal buffert och returnera. Producenten skickar automatiskt i bakgrunden. Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt:
Om buffered_mode är Sant krävs on_error återanrop och fel hanteras på följande sätt:
I buffrat läge förblir sändningen av en batch intakt och skickas som en enda enhet. Batchen ordnas inte om. Detta kan leda till ineffektivitet när händelser skickas. Om du skickar en begränsad lista över EventData eller AmqpAnnotatedMessage och du vet att det är inom storleksgränsen för händelsehubbens ram kan du skicka dem med ett send_batch-anrop . Annars kan du använda create_batch för att skapa EventDataBatch och lägga till EventData eller AmqpAnnotatedMessage i batchen en i taget fram till storleksgränsen och anropa sedan den här metoden för att skicka ut batchen. |
send_event |
Skickar en händelsedata. Som standard blockeras metoden tills bekräftelsen tas emot eller åtgärden överskrider tidsgränsen. Om EventHubProducerClient har konfigurerats för att köras i buffrat läge kommer -metoden att skicka händelsen till lokal buffert och returnera. Producenten utför automatisk batchbearbetning och skickar i bakgrunden. Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt: * Om ett on_error återanrop skickas under producentklientens instansiering,
Om buffered_mode är Sant krävs on_error återanrop och felen hanteras på följande sätt: * Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.
|
close
Stäng den underliggande AMQP-anslutningen för producentklienten och länkarna.
async close(*, flush: bool = True, **kwargs: Any) -> None
Parametrar
- flush
- bool
Endast buffrat läge. Om värdet är True skickas händelser i bufferten omedelbart. Standardvärdet är Sant.
Endast buffrat läge. Timeout för att stänga producenten. Standardvärdet är Ingen, vilket innebär att det inte finns någon tidsgräns.
Returtyp
Undantag
Om ett fel uppstod vid tömning av bufferten om tömning är inställt på Sant eller stänger de underliggande AMQP-anslutningarna i buffrat läge.
Exempel
Stäng hanteraren.
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
try:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
finally:
# Close down the producer handler.
await producer.close()
create_batch
Skapa ett EventDataBatch-objekt med maxstorleken för allt innehåll som begränsas av max_size_in_bytes.
Max_size_in_bytes får inte vara större än den maximala tillåtna meddelandestorleken som definieras av tjänsten.
async create_batch(*, partition_id: str | None = None, partition_key: str | None = None, max_size_in_bytes: int | None = None) -> EventDataBatch
Returtyp
Undantag
Om ett fel uppstod vid tömning av bufferten om tömning är inställt på Sant eller stänger de underliggande AMQP-anslutningarna i buffrat läge.
Exempel
Skapa EventDataBatch-objekt med begränsad storlek
from azure.eventhub import EventData
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
flush
Endast buffrat läge. Töm händelser i bufferten som ska skickas omedelbart om klienten arbetar i buffrat läge.
async flush(**kwargs: Any) -> None
Parametrar
Timeout för att tömma de buffrade händelserna, standardvärdet är Ingen, vilket innebär att det inte finns någon tidsgräns.
Returtyp
Undantag
Om producenten inte kan tömma bufferten inom den angivna tidsgränsen i buffrat läge.
from_connection_string
Skapa en EventHubProducerClient från en anslutningssträng.
from_connection_string(conn_str: str, *, eventhub_name: str | None = None, buffered_mode: bool = False, on_error: Callable[[List[EventData | AmqpAnnotatedMessage], str | None, Exception], Awaitable[None]] | None = None, on_success: Callable[[List[EventData | AmqpAnnotatedMessage], str | None], Awaitable[None]] | None = None, max_buffer_length: int | None = None, max_wait_time: float | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, **kwargs: Any) -> EventHubProducerClient
Parametrar
- eventhub_name
- str
Sökvägen till den specifika händelsehubb som klienten ska anslutas till.
- buffered_mode
- bool
Om sant samlar producentklienten in händelser i en buffert, effektivt batch och publicerar sedan. Standardvärdet är Falskt.
Återanropet anropas när en batch har publicerats. Återanropet tar två parametrar:
händelser: Listan över händelser som har publicerats
partition_id: Det partitions-ID som händelserna i listan har publicerats till.
Återanropsfunktionen ska definieras som: on_success(events, partition_id). Det krävs när buffered_mode är Sant medan det är valfritt om buffered_mode är Falskt.
Återanropet anropas när en batch inte har publicerats. Återanropsfunktionen ska definieras som: on_error(events, partition_id, error), where:
händelser: Listan över händelser som inte kunde publiceras,
partition_id: Partitions-ID:t som händelserna i listan har försökt publiceras till och
fel: Undantaget som rör sändningsfelet.
Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt:
Om ett on_error återanrop skickas under producentklientens instansiering,
sedan skickas felinformation till on_error återanrop, som sedan anropas.
Om ett on_error återanrop inte skickas under klient-instansieringen,
då utlöses felet som standard.
Om buffered_mode är Sant krävs on_error återanrop och fel hanteras på följande sätt:
Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.
Om händelser inte kan skickas efter enqueuing anropas on_error återanrop.
- max_buffer_length
- int
Endast buffrat läge. Det totala antalet händelser per partition som kan bufferas innan en tömning utlöses. Standardvärdet är 1 500 i buffrat läge.
Endast buffrat läge. Hur lång tid det går att vänta på att en batch skapas med händelser i bufferten innan publiceringen. Standardvärdet är 1 i buffrat läge.
- logging_enable
- bool
Om nätverksspårningsloggar ska matas ut till loggarna. Standardvärdet är Falskt.
- http_proxy
- dict
HTTP-proxyinställningar. Detta måste vara en ordlista med följande nycklar: "proxy_hostname" (str-värde) och "proxy_port" (int-värde). Dessutom kan följande nycklar också finnas: "användarnamn", "lösenord".
- auth_timeout
- float
Tiden i sekunder att vänta på att en token ska auktoriseras av tjänsten. Standardvärdet är 60 sekunder. Om värdet är 0 tillämpas ingen tidsgräns från klienten.
- user_agent
- str
Om detta anges läggs detta till framför användaragentsträngen.
- retry_total
- int
Det totala antalet försök att göra om en misslyckad åtgärd när ett fel inträffar. Standardvärdet är 3.
- retry_backoff_factor
- float
En backoff-faktor som ska tillämpas mellan försök efter det andra försöket (de flesta fel löses omedelbart med ett andra försök utan fördröjning). I fast läge kommer återförsöksprincipen alltid att vara i viloläge för {backoff factor}. I exponentiellt läge försätts återförsöksprincipen i viloläge för: {backoff factor} * (2 ** ({antal totala återförsök} – 1)) sekunder. Om backoff_factor är 0,1 kommer återförsöket att viloläge för [0,0s, 0,2s, 0,4s, ...] mellan återförsök. Standardvärdet är 0,8.
- retry_backoff_max
- float
Maximal ledighetstid. Standardvärdet är 120 sekunder (2 minuter).
- retry_mode
- str
Fördröjningsbeteendet mellan återförsök. Värden som stöds är "fasta" eller "exponentiella", där standardvärdet är "exponentiellt".
- idle_timeout
- float
Timeout, i sekunder, varefter den här klienten stänger den underliggande anslutningen om det inte finns någon aktivitet. Som standard är värdet Ingen, vilket innebär att klienten inte stängs av på grund av inaktivitet om det inte initieras av tjänsten.
- transport_type
- TransportType
Den typ av transportprotokoll som ska användas för kommunikation med Event Hubs-tjänsten. Standard är TransportType.Amqp i vilket fall port 5671 används. Om port 5671 inte är tillgänglig/blockerad i nätverksmiljön kan TransportType.AmqpOverWebsocket användas i stället som använder port 443 för kommunikation.
Den anpassade slutpunktsadressen som ska användas för att upprätta en anslutning till Event Hubs-tjänsten, så att nätverksbegäranden kan dirigeras via alla programgatewayer eller andra sökvägar som behövs för värdmiljön. Standardvärdet är Ingen. Formatet skulle vara som "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Om porten inte anges i custom_endpoint_address används port 443 som standard.
Sökväg till den anpassade CA_BUNDLE-filen för SSL-certifikatet som används för att autentisera anslutningsslutpunktens identitet. Standard är Ingen i vilket fall certifi.where() kommer att användas.
- uamqp_transport
- bool
Om du vill använda uamqp-biblioteket som den underliggande transporten. Standardvärdet är False och Pure Python AMQP-biblioteket används som underliggande transport.
Returtyp
Undantag
Om ett fel uppstod vid tömning av bufferten om tömning är inställt på Sant eller stänger de underliggande AMQP-anslutningarna i buffrat läge.
Exempel
Skapa en ny instans av EventHubProducerClient från anslutningssträng.
import os
from azure.eventhub.aio import EventHubProducerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
producer = EventHubProducerClient.from_connection_string(
conn_str=event_hub_connection_str,
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_buffered_event_count
Antalet händelser som buffrats och väntar på att publiceras för en viss partition. Returnerar Ingen i icke-buffrat läge. Obs! Händelsebufferten bearbetas i en coroutine i bakgrunden. Därför bör antalet händelser i bufferten som rapporteras av detta API endast betraktas som en uppskattning och rekommenderas endast för felsökning. För ett partitions-ID som inte har några buffrade händelser returneras 0 oavsett om partitions-ID:t faktiskt finns i händelsehubben.
get_buffered_event_count(partition_id: str) -> int | None
Parametrar
Returtyp
Undantag
Om ett fel uppstod vid tömning av bufferten om tömning är inställt på Sant eller stänger de underliggande AMQP-anslutningarna i buffrat läge.
get_eventhub_properties
Hämta egenskaper för händelsehubben.
Nycklar i den returnerade ordlistan är:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Returer
En ordlista som innehåller information om händelsehubben.
Returtyp
Undantag
get_partition_ids
Hämta partitions-ID:t för händelsehubben.
async get_partition_ids() -> List[str]
Returer
En lista över partitions-ID:t.
Returtyp
Undantag
get_partition_properties
Hämta egenskaper för den angivna partitionen.
Nycklar i egenskapsordlistan är:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parametrar
Returer
En dikta med partitionsegenskaper.
Returtyp
Undantag
send_batch
Skickar en batch med händelsedata. Som standard blockeras metoden tills bekräftelsen tas emot eller åtgärden överskrider tidsgränsen. Om EventHubProducerClient har konfigurerats för att köras i buffrat läge kommer -metoden att skicka händelserna till lokal buffert och returnera. Producenten skickar automatiskt i bakgrunden.
Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt:
Om ett on_error återanrop skickas under producentklientens instansiering,
sedan skickas felinformation till on_error återanrop, som sedan anropas.
Om ett on_error återanrop inte skickas under klient-instansieringen,
då utlöses felet som standard.
Om buffered_mode är Sant krävs on_error återanrop och fel hanteras på följande sätt:
Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.
Om händelser inte kan skickas efter enqueuing anropas on_error återanrop.
I buffrat läge förblir sändningen av en batch intakt och skickas som en enda enhet. Batchen ordnas inte om. Detta kan leda till ineffektivitet när händelser skickas.
Om du skickar en begränsad lista över EventData eller AmqpAnnotatedMessage och du vet att det är inom storleksgränsen för händelsehubbens ram kan du skicka dem med ett send_batch-anrop . Annars kan du använda create_batch för att skapa EventDataBatch och lägga till EventData eller AmqpAnnotatedMessage i batchen en i taget fram till storleksgränsen och anropa sedan den här metoden för att skicka ut batchen.
async send_batch(event_data_batch: EventDataBatch | List[EventData | AmqpAnnotatedMessage], **kwargs: Any) -> None
Parametrar
- event_data_batch
- Union[EventDataBatch, List[Union[EventData, AmqpAnnotatedMessage]]
EventDataBatch-objektet som ska skickas eller en lista över EventData som ska skickas i en batch. Alla EventData eller AmqpAnnotatedMessage i listan eller EventDataBatch hamnar på samma partition.
- timeout
- float
Den maximala väntetiden för att skicka händelsedata i icke-buffrat läge eller den maximala väntetiden för att skicka händelsedata till bufferten i buffrat läge. I icke-buffrat läge används den standardväntetid som angavs när producenten skapades. I buffrat läge är standardväntetiden Ingen.
- partition_id
- str
Det specifika partitions-ID som ska skickas till. Standard är Ingen, i vilket fall tjänsten tilldelar till alla partitioner med hjälp av resursallokering. En TypeError aktiveras om partition_id anges och event_data_batch är en EventDataBatch eftersom Själva EventDataBatch har partition_id.
- partition_key
- str
Med den angivna partition_key skickas händelsedata till en viss partition av händelsehubben som bestäms av tjänsten. En TypeError aktiveras om partition_key anges och event_data_batch är en EventDataBatch eftersom Själva EventDataBatch har partition_key. Om både partition_id och partition_key anges har partition_id företräde. VARNING! Det rekommenderas inte att ange partition_key för icke-strängvärde för de händelser som ska skickas eftersom partition_key ignoreras av Event Hub-tjänsten och händelser tilldelas till alla partitioner med resursallokering. Dessutom finns det SDK:er för användning av händelser som förväntar sig att partition_key endast är strängtyp. De kan misslyckas med att parsa värdet som inte är sträng.
Returtyp
Undantag
Om värdet som anges av timeout-parametern förflutit innan händelsen kan skickas i icke-buffrat läge eller om händelserna kan placeras i buffrat läge.
Exempel
Skickar händelsedata asynkront
async with producer:
event_data_batch = await producer.create_batch()
while True:
try:
event_data_batch.add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
await producer.send_batch(event_data_batch)
send_event
Skickar en händelsedata. Som standard blockeras metoden tills bekräftelsen tas emot eller åtgärden överskrider tidsgränsen. Om EventHubProducerClient har konfigurerats för att köras i buffrat läge kommer -metoden att skicka händelsen till lokal buffert och returnera. Producenten utför automatisk batchbearbetning och skickar i bakgrunden.
Om buffered_mode är Falskt är on_error återanrop valfritt och fel hanteras på följande sätt: * Om ett on_error återanrop skickas under producentklientens instansiering,
then error information will be passed to the *on_error* callback, which will then be called.
* If an *on_error* callback is not passed in during client instantiation,
then the error will be raised by default.
Om buffered_mode är Sant krävs on_error återanrop och felen hanteras på följande sätt: * Om händelser inte visas inom den angivna tidsgränsen utlöses ett fel direkt.
* If events fail to send after enqueuing successfully, the *on_error* callback will be called.
async send_event(event_data: EventData | AmqpAnnotatedMessage, **kwargs: Any) -> None
Parametrar
- timeout
- float
Den maximala väntetiden för att skicka händelsedata i icke-buffrat läge eller den maximala väntetiden för att skicka händelsedata till bufferten i buffrat läge. I icke-buffrat läge används den standardväntetid som angavs när producenten skapades. I buffrat läge är standardväntetiden Ingen.
- partition_id
- str
Det specifika partitions-ID som ska skickas till. Standard är Ingen, i vilket fall tjänsten tilldelar till alla partitioner med hjälp av resursallokering. En TypeError aktiveras om partition_id anges och event_data_batch är en EventDataBatch eftersom Själva EventDataBatch har partition_id.
- partition_key
- str
Med den angivna partition_key skickas händelsedata till en viss partition av händelsehubben som bestäms av tjänsten. En TypeError aktiveras om partition_key anges och event_data_batch är en EventDataBatch eftersom Själva EventDataBatch har partition_key. Om både partition_id och partition_key anges har partition_id företräde. VARNING! Det rekommenderas inte att ange partition_key för icke-strängvärde för de händelser som ska skickas eftersom partition_key ignoreras av Event Hub-tjänsten och händelser tilldelas till alla partitioner med resursallokering. Dessutom finns det SDK:er för användning av händelser som förväntar sig att partition_key endast är strängtyp. De kan misslyckas med att parsa värdet som inte är sträng.
Returtyp
Undantag
Om värdet som anges av timeout-parametern förflutit innan händelsen kan skickas i icke-buffrat läge eller om händelserna inte kan placeras i buffrat läge.
Attribut
total_buffered_event_count
Det totala antalet händelser som för närvarande buffrats och väntar på att publiceras över alla partitioner. Returnerar Ingen i icke-buffrat läge. Obs! Händelsebufferten bearbetas i en coroutine i bakgrunden. Därför bör antalet händelser i bufferten som rapporteras av detta API endast betraktas som en uppskattning och rekommenderas endast för felsökning.
Returtyp
Azure SDK for Python