Freigeben über


EventHubConsumerClient Klasse

Die EventHubConsumerClient-Klasse definiert eine allgemeine Schnittstelle zum Empfangen von Ereignissen vom Azure Event Hubs-Dienst.

Das Standard Ziel von EventHubConsumerClient besteht darin, Ereignisse von allen Partitionen eines EventHub mit Lastenausgleich und Prüfpunkting zu empfangen.

Wenn mehrere EventHubConsumerClient-Instanzen für denselben Event Hub, dieselbe Consumergruppe und denselben Prüfpunktspeicherort ausgeführt werden, werden die Partitionen gleichmäßig auf sie verteilt.

Um den Lastenausgleich und persistente Prüfpunkte zu aktivieren, muss beim Erstellen von EventHubConsumerClient checkpoint_store festgelegt werden. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet.

Ein EventHubConsumerClient kann auch von einer bestimmten Partition empfangen, wenn Sie die Methode receive() oder receive_batch() aufrufen und die partition_id angeben. Der Lastenausgleich funktioniert nicht im Einzelpartitionsmodus. Benutzer können jedoch weiterhin Prüfpunkte speichern, wenn die checkpoint_store festgelegt ist.

Vererbung
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Konstruktor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parameter

fully_qualified_namespace
str
Erforderlich

Der vollqualifizierte Hostname für den Event Hubs-Namespace. Das Namespaceformat ist . servicebus.windows.net.

eventhub_name
str
Erforderlich

Der Pfad des spezifischen Event Hubs, mit dem der Client verbunden werden soll.

consumer_group
str
Erforderlich

Empfangen von Ereignissen vom Event Hub für diese Consumergruppe.

credential
AsyncTokenCredential oder AzureSasCredential oder AzureNamedKeyCredential
Erforderlich

Das für die Authentifizierung verwendete Anmeldeinformationsobjekt, das eine bestimmte Schnittstelle zum Abrufen von Token implementiert. Es akzeptiert EventHubSharedKeyCredential, oder Anmeldeinformationsobjekte, die von der azure-identity-Bibliothek generiert werden, und Objekte, die die *get_token(self, scopes) -Methode implementieren.

logging_enable
bool

Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.

auth_timeout
float

Die Zeit in Sekunden, um auf die Autorisierung eines Tokens durch den Dienst zu warten. Der Standardwert beträgt 60 Sekunden. Wenn dieser Wert auf 0 festgelegt ist, wird kein Timeout vom Client erzwungen.

user_agent
str

Falls angegeben, wird dies vor der Zeichenfolge des Benutzer-Agents hinzugefügt.

retry_total
int

Die Gesamtanzahl der Versuche, einen fehlgeschlagenen Vorgang zu wiederholen, wenn ein Fehler auftritt. Der Standardwert ist 3. Der Kontext der retry_total beim Empfangen ist besonders: Die Empfangsmethode wird in jeder Iteration durch eine While-Schleife implementiert, die die interne Empfangsmethode aufruft. Im Empfangsfall gibt retry_total die Anzahl der Wiederholungsversuche nach einem Fehler an, der von der internen Empfangsmethode in der while-Schleife ausgelöst wurde. Wenn Wiederholungsversuche erschöpft sind, wird der on_error Rückruf (sofern angegeben) mit den Fehlerinformationen aufgerufen. Der fehlerhafte interne Partitionsconsumer wird geschlossen (on_partition_close wird aufgerufen, wenn angegeben), und ein neuer interner Partitionsconsumer wird erstellt (on_partition_initialize wird aufgerufen, wenn angegeben), um den Empfang fortzusetzen.

retry_backoff_factor
float

Ein Backofffaktor, der zwischen den Versuchen nach dem zweiten Versuch angewendet werden soll (die meisten Fehler werden sofort durch einen zweiten Versuch ohne Verzögerung behoben). Im festen Modus wird die Wiederholungsrichtlinie für {Backoff-Faktor} immer in den Ruhezustand versetzt. Im Modus "exponentiell" wird die Wiederholungsrichtlinie für: {backoff factor} * (2 ** ({number of total retries} - 1)) sekunden in den Ruhezustand versetzt. Wenn der backoff_factor 0,1 ist, wird der Wiederholungsversuch für [0.0s, 0.2s, 0.4s, ...] zwischen Wiederholungen in den Ruhezustand versetzt. Der Standardwert ist 0,8.

retry_backoff_max
float

Die maximale Backoffzeit. Der Standardwert ist 120 Sekunden (2 Minuten).

retry_mode
str

Das Verzögerungsverhalten zwischen Wiederholungsversuchen. Unterstützte Werte sind "fixed" oder "exponential", wobei der Standardwert "exponentiell" ist.

idle_timeout
float

Timeout in Sekunden, nach dem dieser Client die zugrunde liegende Verbindung schließt, wenn keine weitere Aktivität erfolgt. Standardmäßig ist der Wert None, was bedeutet, dass der Client aufgrund von Inaktivität nicht heruntergefahren wird, es sei denn, der Dienst wird initiiert.

transport_type
TransportType

Der Typ des Transportprotokolls, das für die Kommunikation mit dem Event Hubs-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp , in diesem Fall wird Port 5671 verwendet. Wenn der Port 5671 in der Netzwerkumgebung nicht verfügbar/blockiert ist, kann stattdessen TransportType.AmqpOverWebsocket verwendet werden, wobei Port 443 für die Kommunikation verwendet wird.

http_proxy

HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (str-Wert) und "proxy_port" (int-Wert).

checkpoint_store
Optional[CheckpointStore]

Ein Manager, der die Partitionslastenausgleichs- und Prüfpunktdaten beim Empfangen von Ereignissen speichert. Der Prüfpunktspeicher wird in beiden Fällen verwendet, in denen alle Partitionen oder eine einzelne Partition empfangen werden. Im letzteren Fall gilt der Lastenausgleich nicht. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet, und der EventHubConsumerClient-instance empfängt Ereignisse ohne Lastenausgleich.

load_balancing_interval
float

Wenn der Lastenausgleich einsetzt. Dies ist das Intervall in Sekunden zwischen zwei Lastenausgleichsauswertungen. Der Standardwert ist „30 Sekunden“.

partition_ownership_expiration_interval
float

Ein Partitionsbesitz läuft nach dieser Anzahl von Sekunden ab. Bei jeder Lastenausgleichsauswertung wird die Ablaufzeit des Besitzes automatisch verlängert. Der Standardwert ist 6 * load_balancing_interval, d. h. 180 Sekunden, wenn die Standard-load_balancing_interval von 30 Sekunden verwendet wird.

load_balancing_strategy
str oder LoadBalancingStrategy

Wenn der Lastenausgleich einsetzt, wird diese Strategie verwendet, um den Partitionsbesitz zu beanspruchen und auszugleichen. Verwenden Sie "gierig" oder LoadBalancingStrategy.GREEDY für die gierige Strategie, die für jede Lastenausgleichsauswertung so viele nicht beanspruchte Partitionen aufnimmt, die zum Lastenausgleich erforderlich sind. Verwenden Sie "balanced" oder LoadBalancingStrategy.BALANCED für die Ausgewogene Strategie, die für jede Lastenausgleichsauswertung nur eine Partition beansprucht, die nicht von einem anderen EventHubConsumerClient beansprucht wird. Wenn alle Partitionen eines EventHubs von einem anderen EventHubConsumerClient beansprucht werden und dieser Client zu wenige Partitionen beansprucht hat, stiehlt dieser Client eine Partition von anderen Clients für jede Lastenausgleichsauswertung, unabhängig von der Lastenausgleichsstrategie. Die Gierstrategie wird standardmäßig verwendet.

custom_endpoint_address
Optional[str]

Die benutzerdefinierte Endpunktadresse, die zum Herstellen einer Verbindung mit dem Event Hubs-Dienst verwendet werden soll, sodass Netzwerkanforderungen über alle Anwendungsgateways oder anderen Pfade weitergeleitet werden können, die für die Hostumgebung erforderlich sind. Der Standardwert ist None. Das Format wäre wie "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Wenn port nicht im custom_endpoint_address angegeben ist, wird standardmäßig Port 443 verwendet.

connection_verify
Optional[str]

Pfad zur benutzerdefinierten CA_BUNDLE Datei des SSL-Zertifikats, das zum Authentifizieren der Identität des Verbindungsendpunkts verwendet wird. Der Standardwert ist None. In diesem Fall wird certifi.where() verwendet.

uamqp_transport
bool

Gibt an, ob die uamqp-Bibliothek als zugrunde liegender Transport verwendet werden soll. Der Standardwert ist False, und die Pure Python AMQP-Bibliothek wird als zugrunde liegender Transport verwendet.

socket_timeout
float

Die Zeit in Sekunden, die der zugrunde liegende Socket für die Verbindung beim Senden und Empfangen von Daten warten soll, bevor ein Timeout auftritt. Der Standardwert ist 0,2 für TransportType.Amqp und 1 für TransportType.AmqpOverWebsocket. Wenn EventHubsConnectionError-Fehler aufgrund eines Schreibzeitlimits auftreten, muss möglicherweise ein größer als der Standardwert übergeben werden. Dies ist für erweiterte Verwendungsszenarien vorgesehen, und normalerweise sollte der Standardwert ausreichend sein.

Beispiele

Erstellen Sie eine neue instance von EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Methoden

close

Beenden Sie das Abrufen von Ereignissen aus dem Event Hub, und schließen Sie die zugrunde liegende AMQP-Verbindung und -Links.

from_connection_string

Erstellen Sie einen EventHubConsumerClient aus einer Verbindungszeichenfolge.

get_eventhub_properties

Ruft Eigenschaften des Event Hubs ab.

Zu den Schlüsseln im zurückgegebenen Wörterbuch gehören:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Ruft Partitions-IDs des Event Hubs ab.

get_partition_properties

Ruft Eigenschaften der angegebenen Partition ab.

Zu den Schlüsseln im Eigenschaftenverzeichnis gehören:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten.

receive_batch

Empfangen von Ereignissen von Partitionen in Batches mit optionalem Lastenausgleich und Prüfpunkting.

close

Beenden Sie das Abrufen von Ereignissen aus dem Event Hub, und schließen Sie die zugrunde liegende AMQP-Verbindung und -Links.

async close() -> None

Rückgabetyp

Beispiele

Schließen Sie den Client.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

Erstellen Sie einen EventHubConsumerClient aus einer Verbindungszeichenfolge.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Parameter

conn_str
str
Erforderlich

Die Verbindungszeichenfolge eines Event Hubs.

consumer_group
str
Erforderlich

Empfangen von Ereignissen vom Event Hub für diese Consumergruppe.

eventhub_name
str

Der Pfad des spezifischen Event Hubs, mit dem der Client verbunden werden soll.

logging_enable
bool

Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.

http_proxy
dict

HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (str-Wert) und "proxy_port" (int-Wert). Darüber hinaus können auch die folgenden Schlüssel vorhanden sein: "Benutzername", "Kennwort".

auth_timeout
float

Die Zeit in Sekunden, um auf die Autorisierung eines Tokens durch den Dienst zu warten. Der Standardwert beträgt 60 Sekunden. Wenn dieser Wert auf 0 festgelegt ist, wird kein Timeout vom Client erzwungen.

user_agent
str

Falls angegeben, wird dies vor der Zeichenfolge des Benutzer-Agents hinzugefügt.

retry_total
int

Die Gesamtanzahl der Versuche, einen fehlgeschlagenen Vorgang zu wiederholen, wenn ein Fehler auftritt. Der Standardwert ist 3. Der Kontext der retry_total beim Empfangen ist besonders: Die Empfangsmethode wird in jeder Iteration durch eine While-Schleife implementiert, die die interne Empfangsmethode aufruft. Im Empfangsfall gibt retry_total die Anzahl der Wiederholungsversuche nach einem Fehler an, der von der internen Empfangsmethode in der while-Schleife ausgelöst wurde. Wenn Wiederholungsversuche erschöpft sind, wird der on_error Rückruf (sofern angegeben) mit den Fehlerinformationen aufgerufen. Der fehlerhafte interne Partitionsconsumer wird geschlossen (on_partition_close wird aufgerufen, wenn angegeben), und ein neuer interner Partitionsconsumer wird erstellt (on_partition_initialize wird aufgerufen, wenn angegeben), um den Empfang fortzusetzen.

retry_backoff_factor
float

Ein Backofffaktor, der zwischen den Versuchen nach dem zweiten Versuch angewendet werden soll (die meisten Fehler werden sofort durch einen zweiten Versuch ohne Verzögerung behoben). Im festen Modus wird die Wiederholungsrichtlinie für {Backoff-Faktor} immer in den Ruhezustand versetzt. Im Modus "exponentiell" wird die Wiederholungsrichtlinie für: {backoff factor} * (2 ** ({number of total retries} - 1)) sekunden in den Ruhezustand versetzt. Wenn der backoff_factor 0,1 ist, wird der Wiederholungsversuch für [0.0s, 0.2s, 0.4s, ...] zwischen Wiederholungen in den Ruhezustand versetzt. Der Standardwert ist 0,8.

retry_backoff_max
float

Die maximale Backoffzeit. Der Standardwert ist 120 Sekunden (2 Minuten).

retry_mode
str

Das Verzögerungsverhalten zwischen Wiederholungsversuchen. Unterstützte Werte sind "fixed" oder "exponential", wobei der Standardwert "exponentiell" ist.

idle_timeout
float

Timeout in Sekunden, nach dem dieser Client die zugrunde liegende Verbindung schließt, wenn keine weitere Aktivität erfolgt. Standardmäßig ist der Wert None, was bedeutet, dass der Client aufgrund von Inaktivität nicht heruntergefahren wird, es sei denn, der Dienst wird initiiert.

transport_type
TransportType

Der Typ des Transportprotokolls, das für die Kommunikation mit dem Event Hubs-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp , in diesem Fall wird Port 5671 verwendet. Wenn der Port 5671 in der Netzwerkumgebung nicht verfügbar/blockiert ist, kann stattdessen TransportType.AmqpOverWebsocket verwendet werden, wobei Port 443 für die Kommunikation verwendet wird.

checkpoint_store
Optional[CheckpointStore]

Ein Manager, der die Partitionslastenausgleichs- und Prüfpunktdaten beim Empfangen von Ereignissen speichert. Der Prüfpunktspeicher wird in beiden Fällen verwendet, in denen alle Partitionen oder eine einzelne Partition empfangen werden. Im letzteren Fall gilt der Lastenausgleich nicht. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet, und der EventHubConsumerClient-instance empfängt Ereignisse ohne Lastenausgleich.

load_balancing_interval
float

Wenn der Lastenausgleich einsetzt. Dies ist das Intervall in Sekunden zwischen zwei Lastenausgleichsauswertungen. Der Standardwert ist „30 Sekunden“.

partition_ownership_expiration_interval
float

Ein Partitionsbesitz läuft nach dieser Anzahl von Sekunden ab. Bei jeder Lastenausgleichsauswertung wird die Ablaufzeit des Besitzes automatisch verlängert. Der Standardwert ist 6 * load_balancing_interval, d. h. 180 Sekunden, wenn die Standard-load_balancing_interval von 30 Sekunden verwendet wird.

load_balancing_strategy
str oder LoadBalancingStrategy

Wenn der Lastenausgleich einsetzt, wird diese Strategie verwendet, um den Partitionsbesitz zu beanspruchen und auszugleichen. Verwenden Sie "gierig" oder LoadBalancingStrategy.GREEDY für die gierige Strategie, die für jede Lastenausgleichsauswertung so viele nicht beanspruchte Partitionen aufnimmt, die zum Lastenausgleich erforderlich sind. Verwenden Sie "balanced" oder LoadBalancingStrategy.BALANCED für die Ausgewogene Strategie, die für jede Lastenausgleichsauswertung nur eine Partition beansprucht, die nicht von einem anderen EventHubConsumerClient beansprucht wird. Wenn alle Partitionen eines EventHubs von einem anderen EventHubConsumerClient beansprucht werden und dieser Client zu wenige Partitionen beansprucht hat, stiehlt dieser Client eine Partition von anderen Clients für jede Lastenausgleichsauswertung, unabhängig von der Lastenausgleichsstrategie. Die Gierstrategie wird standardmäßig verwendet.

custom_endpoint_address
Optional[str]

Die benutzerdefinierte Endpunktadresse, die zum Herstellen einer Verbindung mit dem Event Hubs-Dienst verwendet werden soll, sodass Netzwerkanforderungen über alle Anwendungsgateways oder anderen Pfade weitergeleitet werden können, die für die Hostumgebung erforderlich sind. Der Standardwert ist None. Das Format wäre wie "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Wenn port nicht im custom_endpoint_address angegeben ist, wird standardmäßig Port 443 verwendet.

connection_verify
Optional[str]

Pfad zur benutzerdefinierten CA_BUNDLE Datei des SSL-Zertifikats, das zum Authentifizieren der Identität des Verbindungsendpunkts verwendet wird. Der Standardwert ist None. In diesem Fall wird certifi.where() verwendet.

uamqp_transport
bool

Gibt an, ob die uamqp-Bibliothek als zugrunde liegender Transport verwendet werden soll. Der Standardwert ist False, und die Pure Python AMQP-Bibliothek wird als zugrunde liegender Transport verwendet.

Rückgabetyp

Beispiele

Erstellen Sie eine neue instance von EventHubConsumerClient aus Verbindungszeichenfolge.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Ruft Eigenschaften des Event Hubs ab.

Zu den Schlüsseln im zurückgegebenen Wörterbuch gehören:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Gibt zurück

Ein Wörterbuch, das Informationen zum Event Hub enthält.

Rückgabetyp

Ausnahmen

get_partition_ids

Ruft Partitions-IDs des Event Hubs ab.

async get_partition_ids() -> List[str]

Gibt zurück

Eine Liste der Partitions-IDs.

Rückgabetyp

Ausnahmen

get_partition_properties

Ruft Eigenschaften der angegebenen Partition ab.

Zu den Schlüsseln im Eigenschaftenverzeichnis gehören:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parameter

partition_id
str
Erforderlich

Die Zielpartitions-ID.

Gibt zurück

Ein Wörterbuch, das Partitionseigenschaften enthält.

Rückgabetyp

Ausnahmen

receive

Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parameter

on_event
Callable[PartitionContext, Optional[EventData]]
Erforderlich

Die Rückruffunktion für die Behandlung eines empfangenen Ereignisses. Der Rückruf akzeptiert zwei Parameter: partition_context , der den Partitionskontext enthält, und das Ereignis , das das empfangene Ereignis ist. Die Rückruffunktion sollte wie folgt definiert werden: on_event(partition_context, ereignis). Ausführliche Informationen zum PartitionContextPartitionskontext finden Sie unter .

max_wait_time
float

Das maximale Intervall in Sekunden, das der Ereignisprozessor wartet, bevor der Rückruf aufgerufen wird. Wenn innerhalb dieses Intervalls keine Ereignisse empfangen werden, wird der rückruf von on_event mit None aufgerufen. Wenn dieser Wert auf None oder 0 (Standard) festgelegt ist, wird der Rückruf erst aufgerufen, wenn ein Ereignis empfangen wird.

partition_id
str

Wenn angegeben, empfängt der Client nur von dieser Partition. Andernfalls empfängt der Client von allen Partitionen.

owner_level
int

Die Priorität für einen exklusiven Verbraucher. Wenn owner_level festgelegt ist, wird ein exklusiver Consumer erstellt. Ein Consumer mit einem höheren owner_level hat eine höhere exklusive Priorität. Die Besitzerebene wird auch als "Epochenwert" des Consumers bezeichnet.

prefetch
int

Die Anzahl der Ereignisse, die vom Dienst zur Verarbeitung vorab abgerufen werden sollen. Der Standardwert ist 300.

track_last_enqueued_event_properties
bool

Gibt an, ob der Consumer Informationen zum Ereignis der letzten Warteschlange auf der zugeordneten Partition anfordern und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden. Wenn Informationen zu den Partitionen in der letzten Warteschlange nachverfolgt werden, enthält jedes ereignis, das vom Event Hubs-Dienst empfangen wird, Metadaten zur Partition. Dies führt zu einem geringen zusätzlichen Netzwerkbandbreitenverbrauch, der in der Regel ein günstiger Kompromiss ist, wenn er bei regelmäßigen Anforderungen für Partitionseigenschaften mithilfe des Event Hub-Clients berücksichtigt wird. Sie ist standardmäßig auf False festgelegt.

starting_position
str, int, datetime oder dict[str,any]

Beginnen Sie mit dem Empfang von dieser Ereignisposition, wenn keine Prüfpunktdaten für eine Partition vorhanden sind. Prüfpunktdaten werden verwendet, sofern verfügbar. Dies kann ein Diktat mit partitions-ID als Schlüssel und Position als Wert für einzelne Partitionen oder ein einzelner Wert für alle Partitionen sein. Der Werttyp kann str, int oder datetime.datetime sein. Außerdem werden die Werte "-1" für den Empfang vom Anfang des Datenstroms und "@latest" für den Empfang nur neuer Ereignisse unterstützt.

starting_position_inclusive
bool oder dict[str,bool]

Bestimmen Sie, ob die angegebene starting_position inklusive(>=) ist oder nicht (>). True für inklusive und False für exklusiv. Dies kann ein Diktat mit der Partitions-ID als Schlüssel und bool als Wert sein, der angibt, ob die starting_position für eine bestimmte Partition inklusive ist oder nicht. Dies kann auch ein einzelner Bool-Wert für alle starting_position sein. Der Standardwert ist False.

on_error
Callable[[PartitionContext, Exception]]

Die Rückruffunktion, die aufgerufen wird, wenn während des Empfangs ein Fehler ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind, oder während des Lastenausgleichs. Der Rückruf verwendet zwei Parameter: partition_context , der Partitionsinformationen enthält, und fehler als Ausnahme. partition_context kann Keine sein, wenn der Fehler während des Lastenausgleichs ausgelöst wird. Der Rückruf sollte wie folgt definiert werden: on_error(partition_context, Error). Der on_error Rückruf wird auch aufgerufen, wenn während des on_event Rückrufs eine ausnahme ausgelöst wird, die nicht behandelt wird.

on_partition_initialize
Callable[[PartitionContext]]

Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition die Initialisierung abgeschlossen hat. Sie wird auch aufgerufen, wenn ein neuer interner Partitionsconsumer erstellt wird, um den empfangenden Prozess für einen fehlerhaften und geschlossenen internen Partitionsconsumer zu übernehmen. Der Rückruf verwendet einen einzelnen Parameter: partition_context , der die Partitionsinformationen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition geschlossen wurde. Es wird auch aufgerufen, wenn der Fehler während des Empfangs ausgelöst wird, nachdem wiederholungsversuche aufgebraucht sind. Der Rückruf verwendet zwei Parameter: partition_context , der Partitionsinformationen und grund für das Schließen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_close(partition_context, Reason). Bitte beachten Sie für CloseReason die verschiedenen Abschlussgründe.

Rückgabetyp

Beispiele

Empfangen von Ereignissen vom EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Empfangen von Ereignissen von Partitionen in Batches mit optionalem Lastenausgleich und Prüfpunkting.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parameter

on_event_batch
Callable[PartitionContext, List[EventData]]
Erforderlich

Die Rückruffunktion zum Behandeln eines Batches empfangener Ereignisse. Der Rückruf verwendet zwei Parameter: partition_context , der den Partitionskontext enthält, und event_batch, bei dem es sich um die empfangenen Ereignisse handelt. Die Rückruffunktion sollte wie folgt definiert werden: on_event_batch(partition_context, event_batch). event_batch kann eine leere Liste sein, wenn max_wait_time weder None noch 0 ist und nach max_wait_time kein Ereignis empfangen wird. Ausführliche Informationen zum PartitionContextPartitionskontext finden Sie unter .

max_batch_size
int

Die maximale Anzahl von Ereignissen in einem Batch, die an den Rückruf übergeben on_event_batch. Wenn die tatsächliche empfangene Anzahl von Ereignissen größer als max_batch_size ist, werden die empfangenen Ereignisse in Batches unterteilt und rufen den Rückruf für jeden Batch mit bis zu max_batch_size Ereignissen auf.

max_wait_time
float

Das maximale Intervall in Sekunden, das der Ereignisprozessor wartet, bevor der Rückruf aufgerufen wird. Wenn innerhalb dieses Intervalls keine Ereignisse empfangen werden, wird der on_event_batch Rückruf mit einer leeren Liste aufgerufen. Wenn dieser Wert auf None oder 0 (Standard) festgelegt ist, wird der Rückruf erst aufgerufen, wenn Ereignisse empfangen werden.

partition_id
str

Wenn angegeben, empfängt der Client nur von dieser Partition. Andernfalls empfängt der Client von allen Partitionen.

owner_level
int

Die Priorität für einen exklusiven Verbraucher. Wenn owner_level festgelegt ist, wird ein exklusiver Consumer erstellt. Ein Consumer mit einem höheren owner_level hat eine höhere exklusive Priorität. Die Besitzerebene wird auch als "Epochenwert" des Consumers bezeichnet.

prefetch
int

Die Anzahl der Ereignisse, die vom Dienst zur Verarbeitung vorab abgerufen werden sollen. Der Standardwert ist 300.

track_last_enqueued_event_properties
bool

Gibt an, ob der Consumer Informationen zum Ereignis der letzten Warteschlange auf der zugeordneten Partition anfordern und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden. Wenn Informationen zu den Partitionen in der letzten Warteschlange nachverfolgt werden, enthält jedes ereignis, das vom Event Hubs-Dienst empfangen wird, Metadaten zur Partition. Dies führt zu einem geringen zusätzlichen Netzwerkbandbreitenverbrauch, der in der Regel ein günstiger Kompromiss ist, wenn er bei regelmäßigen Anforderungen für Partitionseigenschaften mithilfe des Event Hub-Clients berücksichtigt wird. Sie ist standardmäßig auf False festgelegt.

starting_position
str, int, datetime oder dict[str,any]

Beginnen Sie mit dem Empfang von dieser Ereignisposition, wenn keine Prüfpunktdaten für eine Partition vorhanden sind. Prüfpunktdaten werden verwendet, sofern verfügbar. Dies kann ein Diktat mit partitions-ID als Schlüssel und Position als Wert für einzelne Partitionen oder ein einzelner Wert für alle Partitionen sein. Der Werttyp kann str, int oder datetime.datetime sein. Außerdem werden die Werte "-1" für den Empfang vom Anfang des Datenstroms und "@latest" für den Empfang nur neuer Ereignisse unterstützt.

starting_position_inclusive
bool oder dict[str,bool]

Bestimmen Sie, ob die angegebene starting_position inklusive(>=) ist oder nicht (>). True für inklusive und False für exklusiv. Dies kann ein Diktat mit der Partitions-ID als Schlüssel und bool als Wert sein, der angibt, ob die starting_position für eine bestimmte Partition inklusive ist oder nicht. Dies kann auch ein einzelner Bool-Wert für alle starting_position sein. Der Standardwert ist False.

on_error
Callable[[PartitionContext, Exception]]

Die Rückruffunktion, die aufgerufen wird, wenn während des Empfangs ein Fehler ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind, oder während des Lastenausgleichs. Der Rückruf verwendet zwei Parameter: partition_context , der Partitionsinformationen enthält, und fehler als Ausnahme. partition_context kann Keine sein, wenn der Fehler während des Lastenausgleichs ausgelöst wird. Der Rückruf sollte wie folgt definiert werden: on_error(partition_context, Error). Der on_error Rückruf wird auch aufgerufen, wenn während des on_event Rückrufs eine ausnahme ausgelöst wird, die nicht behandelt wird.

on_partition_initialize
Callable[[PartitionContext]]

Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition die Initialisierung abgeschlossen hat. Sie wird auch aufgerufen, wenn ein neuer interner Partitionsconsumer erstellt wird, um den empfangenden Prozess für einen fehlerhaften und geschlossenen internen Partitionsconsumer zu übernehmen. Der Rückruf verwendet einen einzelnen Parameter: partition_context , der die Partitionsinformationen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition geschlossen wurde. Es wird auch aufgerufen, wenn der Fehler während des Empfangs ausgelöst wird, nachdem wiederholungsversuche aufgebraucht sind. Der Rückruf verwendet zwei Parameter: partition_context , der Partitionsinformationen und grund für das Schließen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_close(partition_context, Reason). Bitte beachten Sie für CloseReason die verschiedenen Abschlussgründe.

Rückgabetyp

Beispiele

Empfangen von Ereignissen in Batches vom EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )