EventHubConsumerClient Klasse
Die EventHubConsumerClient-Klasse definiert eine allgemeine Schnittstelle zum Empfangen von Ereignissen vom Azure Event Hubs-Dienst.
Das Standard Ziel von EventHubConsumerClient besteht darin, Ereignisse von allen Partitionen eines EventHub mit Lastenausgleich und Prüfpunkting zu empfangen.
Wenn mehrere EventHubConsumerClient-Instanzen für denselben Event Hub, dieselbe Consumergruppe und denselben Prüfpunktspeicherort ausgeführt werden, werden die Partitionen gleichmäßig auf sie verteilt.
Um den Lastenausgleich und persistente Prüfpunkte zu aktivieren, muss beim Erstellen von EventHubConsumerClient checkpoint_store festgelegt werden. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet.
Ein EventHubConsumerClient kann auch von einer bestimmten Partition empfangen, wenn Sie die Methode receive() oder receive_batch() aufrufen und die partition_id angeben. Der Lastenausgleich funktioniert nicht im Einzelpartitionsmodus. Benutzer können jedoch weiterhin Prüfpunkte speichern, wenn die checkpoint_store festgelegt ist.
- Vererbung
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
Konstruktor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parameter
- fully_qualified_namespace
- str
Der vollqualifizierte Hostname für den Event Hubs-Namespace. Das Namespaceformat ist . servicebus.windows.net.
- eventhub_name
- str
Der Pfad des spezifischen Event Hubs, mit dem der Client verbunden werden soll.
- credential
- AsyncTokenCredential oder AzureSasCredential oder AzureNamedKeyCredential
Das für die Authentifizierung verwendete Anmeldeinformationsobjekt, das eine bestimmte Schnittstelle zum Abrufen von Token implementiert. Es akzeptiert EventHubSharedKeyCredential, oder Anmeldeinformationsobjekte, die von der azure-identity-Bibliothek generiert werden, und Objekte, die die *get_token(self, scopes) -Methode implementieren.
- logging_enable
- bool
Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.
- auth_timeout
- float
Die Zeit in Sekunden, um auf die Autorisierung eines Tokens durch den Dienst zu warten. Der Standardwert beträgt 60 Sekunden. Wenn dieser Wert auf 0 festgelegt ist, wird kein Timeout vom Client erzwungen.
- user_agent
- str
Falls angegeben, wird dies vor der Zeichenfolge des Benutzer-Agents hinzugefügt.
- retry_total
- int
Die Gesamtanzahl der Versuche, einen fehlgeschlagenen Vorgang zu wiederholen, wenn ein Fehler auftritt. Der Standardwert ist 3. Der Kontext der retry_total beim Empfangen ist besonders: Die Empfangsmethode wird in jeder Iteration durch eine While-Schleife implementiert, die die interne Empfangsmethode aufruft. Im Empfangsfall gibt retry_total die Anzahl der Wiederholungsversuche nach einem Fehler an, der von der internen Empfangsmethode in der while-Schleife ausgelöst wurde. Wenn Wiederholungsversuche erschöpft sind, wird der on_error Rückruf (sofern angegeben) mit den Fehlerinformationen aufgerufen. Der fehlerhafte interne Partitionsconsumer wird geschlossen (on_partition_close wird aufgerufen, wenn angegeben), und ein neuer interner Partitionsconsumer wird erstellt (on_partition_initialize wird aufgerufen, wenn angegeben), um den Empfang fortzusetzen.
- retry_backoff_factor
- float
Ein Backofffaktor, der zwischen den Versuchen nach dem zweiten Versuch angewendet werden soll (die meisten Fehler werden sofort durch einen zweiten Versuch ohne Verzögerung behoben). Im festen Modus wird die Wiederholungsrichtlinie für {Backoff-Faktor} immer in den Ruhezustand versetzt. Im Modus "exponentiell" wird die Wiederholungsrichtlinie für: {backoff factor} * (2 ** ({number of total retries} - 1)) sekunden in den Ruhezustand versetzt. Wenn der backoff_factor 0,1 ist, wird der Wiederholungsversuch für [0.0s, 0.2s, 0.4s, ...] zwischen Wiederholungen in den Ruhezustand versetzt. Der Standardwert ist 0,8.
- retry_backoff_max
- float
Die maximale Backoffzeit. Der Standardwert ist 120 Sekunden (2 Minuten).
- retry_mode
- str
Das Verzögerungsverhalten zwischen Wiederholungsversuchen. Unterstützte Werte sind "fixed" oder "exponential", wobei der Standardwert "exponentiell" ist.
- idle_timeout
- float
Timeout in Sekunden, nach dem dieser Client die zugrunde liegende Verbindung schließt, wenn keine weitere Aktivität erfolgt. Standardmäßig ist der Wert None, was bedeutet, dass der Client aufgrund von Inaktivität nicht heruntergefahren wird, es sei denn, der Dienst wird initiiert.
- transport_type
- TransportType
Der Typ des Transportprotokolls, das für die Kommunikation mit dem Event Hubs-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp , in diesem Fall wird Port 5671 verwendet. Wenn der Port 5671 in der Netzwerkumgebung nicht verfügbar/blockiert ist, kann stattdessen TransportType.AmqpOverWebsocket verwendet werden, wobei Port 443 für die Kommunikation verwendet wird.
- http_proxy
HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (str-Wert) und "proxy_port" (int-Wert).
- checkpoint_store
- Optional[CheckpointStore]
Ein Manager, der die Partitionslastenausgleichs- und Prüfpunktdaten beim Empfangen von Ereignissen speichert. Der Prüfpunktspeicher wird in beiden Fällen verwendet, in denen alle Partitionen oder eine einzelne Partition empfangen werden. Im letzteren Fall gilt der Lastenausgleich nicht. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet, und der EventHubConsumerClient-instance empfängt Ereignisse ohne Lastenausgleich.
- load_balancing_interval
- float
Wenn der Lastenausgleich einsetzt. Dies ist das Intervall in Sekunden zwischen zwei Lastenausgleichsauswertungen. Der Standardwert ist „30 Sekunden“.
- partition_ownership_expiration_interval
- float
Ein Partitionsbesitz läuft nach dieser Anzahl von Sekunden ab. Bei jeder Lastenausgleichsauswertung wird die Ablaufzeit des Besitzes automatisch verlängert. Der Standardwert ist 6 * load_balancing_interval, d. h. 180 Sekunden, wenn die Standard-load_balancing_interval von 30 Sekunden verwendet wird.
- load_balancing_strategy
- str oder LoadBalancingStrategy
Wenn der Lastenausgleich einsetzt, wird diese Strategie verwendet, um den Partitionsbesitz zu beanspruchen und auszugleichen. Verwenden Sie "gierig" oder LoadBalancingStrategy.GREEDY für die gierige Strategie, die für jede Lastenausgleichsauswertung so viele nicht beanspruchte Partitionen aufnimmt, die zum Lastenausgleich erforderlich sind. Verwenden Sie "balanced" oder LoadBalancingStrategy.BALANCED für die Ausgewogene Strategie, die für jede Lastenausgleichsauswertung nur eine Partition beansprucht, die nicht von einem anderen EventHubConsumerClient beansprucht wird. Wenn alle Partitionen eines EventHubs von einem anderen EventHubConsumerClient beansprucht werden und dieser Client zu wenige Partitionen beansprucht hat, stiehlt dieser Client eine Partition von anderen Clients für jede Lastenausgleichsauswertung, unabhängig von der Lastenausgleichsstrategie. Die Gierstrategie wird standardmäßig verwendet.
Die benutzerdefinierte Endpunktadresse, die zum Herstellen einer Verbindung mit dem Event Hubs-Dienst verwendet werden soll, sodass Netzwerkanforderungen über alle Anwendungsgateways oder anderen Pfade weitergeleitet werden können, die für die Hostumgebung erforderlich sind. Der Standardwert ist None. Das Format wäre wie "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Wenn port nicht im custom_endpoint_address angegeben ist, wird standardmäßig Port 443 verwendet.
Pfad zur benutzerdefinierten CA_BUNDLE Datei des SSL-Zertifikats, das zum Authentifizieren der Identität des Verbindungsendpunkts verwendet wird. Der Standardwert ist None. In diesem Fall wird certifi.where() verwendet.
- uamqp_transport
- bool
Gibt an, ob die uamqp-Bibliothek als zugrunde liegender Transport verwendet werden soll. Der Standardwert ist False, und die Pure Python AMQP-Bibliothek wird als zugrunde liegender Transport verwendet.
- socket_timeout
- float
Die Zeit in Sekunden, die der zugrunde liegende Socket für die Verbindung beim Senden und Empfangen von Daten warten soll, bevor ein Timeout auftritt. Der Standardwert ist 0,2 für TransportType.Amqp und 1 für TransportType.AmqpOverWebsocket. Wenn EventHubsConnectionError-Fehler aufgrund eines Schreibzeitlimits auftreten, muss möglicherweise ein größer als der Standardwert übergeben werden. Dies ist für erweiterte Verwendungsszenarien vorgesehen, und normalerweise sollte der Standardwert ausreichend sein.
Beispiele
Erstellen Sie eine neue instance von EventHubConsumerClient.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
Methoden
close |
Beenden Sie das Abrufen von Ereignissen aus dem Event Hub, und schließen Sie die zugrunde liegende AMQP-Verbindung und -Links. |
from_connection_string |
Erstellen Sie einen EventHubConsumerClient aus einer Verbindungszeichenfolge. |
get_eventhub_properties |
Ruft Eigenschaften des Event Hubs ab. Zu den Schlüsseln im zurückgegebenen Wörterbuch gehören:
|
get_partition_ids |
Ruft Partitions-IDs des Event Hubs ab. |
get_partition_properties |
Ruft Eigenschaften der angegebenen Partition ab. Zu den Schlüsseln im Eigenschaftenverzeichnis gehören:
|
receive |
Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten. |
receive_batch |
Empfangen von Ereignissen von Partitionen in Batches mit optionalem Lastenausgleich und Prüfpunkting. |
close
Beenden Sie das Abrufen von Ereignissen aus dem Event Hub, und schließen Sie die zugrunde liegende AMQP-Verbindung und -Links.
async close() -> None
Rückgabetyp
Beispiele
Schließen Sie den Client.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
Erstellen Sie einen EventHubConsumerClient aus einer Verbindungszeichenfolge.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
Parameter
- eventhub_name
- str
Der Pfad des spezifischen Event Hubs, mit dem der Client verbunden werden soll.
- logging_enable
- bool
Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.
- http_proxy
- dict
HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (str-Wert) und "proxy_port" (int-Wert). Darüber hinaus können auch die folgenden Schlüssel vorhanden sein: "Benutzername", "Kennwort".
- auth_timeout
- float
Die Zeit in Sekunden, um auf die Autorisierung eines Tokens durch den Dienst zu warten. Der Standardwert beträgt 60 Sekunden. Wenn dieser Wert auf 0 festgelegt ist, wird kein Timeout vom Client erzwungen.
- user_agent
- str
Falls angegeben, wird dies vor der Zeichenfolge des Benutzer-Agents hinzugefügt.
- retry_total
- int
Die Gesamtanzahl der Versuche, einen fehlgeschlagenen Vorgang zu wiederholen, wenn ein Fehler auftritt. Der Standardwert ist 3. Der Kontext der retry_total beim Empfangen ist besonders: Die Empfangsmethode wird in jeder Iteration durch eine While-Schleife implementiert, die die interne Empfangsmethode aufruft. Im Empfangsfall gibt retry_total die Anzahl der Wiederholungsversuche nach einem Fehler an, der von der internen Empfangsmethode in der while-Schleife ausgelöst wurde. Wenn Wiederholungsversuche erschöpft sind, wird der on_error Rückruf (sofern angegeben) mit den Fehlerinformationen aufgerufen. Der fehlerhafte interne Partitionsconsumer wird geschlossen (on_partition_close wird aufgerufen, wenn angegeben), und ein neuer interner Partitionsconsumer wird erstellt (on_partition_initialize wird aufgerufen, wenn angegeben), um den Empfang fortzusetzen.
- retry_backoff_factor
- float
Ein Backofffaktor, der zwischen den Versuchen nach dem zweiten Versuch angewendet werden soll (die meisten Fehler werden sofort durch einen zweiten Versuch ohne Verzögerung behoben). Im festen Modus wird die Wiederholungsrichtlinie für {Backoff-Faktor} immer in den Ruhezustand versetzt. Im Modus "exponentiell" wird die Wiederholungsrichtlinie für: {backoff factor} * (2 ** ({number of total retries} - 1)) sekunden in den Ruhezustand versetzt. Wenn der backoff_factor 0,1 ist, wird der Wiederholungsversuch für [0.0s, 0.2s, 0.4s, ...] zwischen Wiederholungen in den Ruhezustand versetzt. Der Standardwert ist 0,8.
- retry_backoff_max
- float
Die maximale Backoffzeit. Der Standardwert ist 120 Sekunden (2 Minuten).
- retry_mode
- str
Das Verzögerungsverhalten zwischen Wiederholungsversuchen. Unterstützte Werte sind "fixed" oder "exponential", wobei der Standardwert "exponentiell" ist.
- idle_timeout
- float
Timeout in Sekunden, nach dem dieser Client die zugrunde liegende Verbindung schließt, wenn keine weitere Aktivität erfolgt. Standardmäßig ist der Wert None, was bedeutet, dass der Client aufgrund von Inaktivität nicht heruntergefahren wird, es sei denn, der Dienst wird initiiert.
- transport_type
- TransportType
Der Typ des Transportprotokolls, das für die Kommunikation mit dem Event Hubs-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp , in diesem Fall wird Port 5671 verwendet. Wenn der Port 5671 in der Netzwerkumgebung nicht verfügbar/blockiert ist, kann stattdessen TransportType.AmqpOverWebsocket verwendet werden, wobei Port 443 für die Kommunikation verwendet wird.
- checkpoint_store
- Optional[CheckpointStore]
Ein Manager, der die Partitionslastenausgleichs- und Prüfpunktdaten beim Empfangen von Ereignissen speichert. Der Prüfpunktspeicher wird in beiden Fällen verwendet, in denen alle Partitionen oder eine einzelne Partition empfangen werden. Im letzteren Fall gilt der Lastenausgleich nicht. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet, und der EventHubConsumerClient-instance empfängt Ereignisse ohne Lastenausgleich.
- load_balancing_interval
- float
Wenn der Lastenausgleich einsetzt. Dies ist das Intervall in Sekunden zwischen zwei Lastenausgleichsauswertungen. Der Standardwert ist „30 Sekunden“.
- partition_ownership_expiration_interval
- float
Ein Partitionsbesitz läuft nach dieser Anzahl von Sekunden ab. Bei jeder Lastenausgleichsauswertung wird die Ablaufzeit des Besitzes automatisch verlängert. Der Standardwert ist 6 * load_balancing_interval, d. h. 180 Sekunden, wenn die Standard-load_balancing_interval von 30 Sekunden verwendet wird.
- load_balancing_strategy
- str oder LoadBalancingStrategy
Wenn der Lastenausgleich einsetzt, wird diese Strategie verwendet, um den Partitionsbesitz zu beanspruchen und auszugleichen. Verwenden Sie "gierig" oder LoadBalancingStrategy.GREEDY für die gierige Strategie, die für jede Lastenausgleichsauswertung so viele nicht beanspruchte Partitionen aufnimmt, die zum Lastenausgleich erforderlich sind. Verwenden Sie "balanced" oder LoadBalancingStrategy.BALANCED für die Ausgewogene Strategie, die für jede Lastenausgleichsauswertung nur eine Partition beansprucht, die nicht von einem anderen EventHubConsumerClient beansprucht wird. Wenn alle Partitionen eines EventHubs von einem anderen EventHubConsumerClient beansprucht werden und dieser Client zu wenige Partitionen beansprucht hat, stiehlt dieser Client eine Partition von anderen Clients für jede Lastenausgleichsauswertung, unabhängig von der Lastenausgleichsstrategie. Die Gierstrategie wird standardmäßig verwendet.
Die benutzerdefinierte Endpunktadresse, die zum Herstellen einer Verbindung mit dem Event Hubs-Dienst verwendet werden soll, sodass Netzwerkanforderungen über alle Anwendungsgateways oder anderen Pfade weitergeleitet werden können, die für die Hostumgebung erforderlich sind. Der Standardwert ist None. Das Format wäre wie "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Wenn port nicht im custom_endpoint_address angegeben ist, wird standardmäßig Port 443 verwendet.
Pfad zur benutzerdefinierten CA_BUNDLE Datei des SSL-Zertifikats, das zum Authentifizieren der Identität des Verbindungsendpunkts verwendet wird. Der Standardwert ist None. In diesem Fall wird certifi.where() verwendet.
- uamqp_transport
- bool
Gibt an, ob die uamqp-Bibliothek als zugrunde liegender Transport verwendet werden soll. Der Standardwert ist False, und die Pure Python AMQP-Bibliothek wird als zugrunde liegender Transport verwendet.
Rückgabetyp
Beispiele
Erstellen Sie eine neue instance von EventHubConsumerClient aus Verbindungszeichenfolge.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Ruft Eigenschaften des Event Hubs ab.
Zu den Schlüsseln im zurückgegebenen Wörterbuch gehören:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
Gibt zurück
Ein Wörterbuch, das Informationen zum Event Hub enthält.
Rückgabetyp
Ausnahmen
get_partition_ids
Ruft Partitions-IDs des Event Hubs ab.
async get_partition_ids() -> List[str]
Gibt zurück
Eine Liste der Partitions-IDs.
Rückgabetyp
Ausnahmen
get_partition_properties
Ruft Eigenschaften der angegebenen Partition ab.
Zu den Schlüsseln im Eigenschaftenverzeichnis gehören:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
Parameter
Gibt zurück
Ein Wörterbuch, das Partitionseigenschaften enthält.
Rückgabetyp
Ausnahmen
receive
Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parameter
- on_event
- Callable[PartitionContext, Optional[EventData]]
Die Rückruffunktion für die Behandlung eines empfangenen Ereignisses. Der Rückruf akzeptiert zwei Parameter: partition_context , der den Partitionskontext enthält, und das Ereignis , das das empfangene Ereignis ist. Die Rückruffunktion sollte wie folgt definiert werden: on_event(partition_context, ereignis). Ausführliche Informationen zum PartitionContextPartitionskontext finden Sie unter .
- max_wait_time
- float
Das maximale Intervall in Sekunden, das der Ereignisprozessor wartet, bevor der Rückruf aufgerufen wird. Wenn innerhalb dieses Intervalls keine Ereignisse empfangen werden, wird der rückruf von on_event mit None aufgerufen. Wenn dieser Wert auf None oder 0 (Standard) festgelegt ist, wird der Rückruf erst aufgerufen, wenn ein Ereignis empfangen wird.
- partition_id
- str
Wenn angegeben, empfängt der Client nur von dieser Partition. Andernfalls empfängt der Client von allen Partitionen.
- owner_level
- int
Die Priorität für einen exklusiven Verbraucher. Wenn owner_level festgelegt ist, wird ein exklusiver Consumer erstellt. Ein Consumer mit einem höheren owner_level hat eine höhere exklusive Priorität. Die Besitzerebene wird auch als "Epochenwert" des Consumers bezeichnet.
- prefetch
- int
Die Anzahl der Ereignisse, die vom Dienst zur Verarbeitung vorab abgerufen werden sollen. Der Standardwert ist 300.
- track_last_enqueued_event_properties
- bool
Gibt an, ob der Consumer Informationen zum Ereignis der letzten Warteschlange auf der zugeordneten Partition anfordern und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden. Wenn Informationen zu den Partitionen in der letzten Warteschlange nachverfolgt werden, enthält jedes ereignis, das vom Event Hubs-Dienst empfangen wird, Metadaten zur Partition. Dies führt zu einem geringen zusätzlichen Netzwerkbandbreitenverbrauch, der in der Regel ein günstiger Kompromiss ist, wenn er bei regelmäßigen Anforderungen für Partitionseigenschaften mithilfe des Event Hub-Clients berücksichtigt wird. Sie ist standardmäßig auf False festgelegt.
Beginnen Sie mit dem Empfang von dieser Ereignisposition, wenn keine Prüfpunktdaten für eine Partition vorhanden sind. Prüfpunktdaten werden verwendet, sofern verfügbar. Dies kann ein Diktat mit partitions-ID als Schlüssel und Position als Wert für einzelne Partitionen oder ein einzelner Wert für alle Partitionen sein. Der Werttyp kann str, int oder datetime.datetime sein. Außerdem werden die Werte "-1" für den Empfang vom Anfang des Datenstroms und "@latest" für den Empfang nur neuer Ereignisse unterstützt.
Bestimmen Sie, ob die angegebene starting_position inklusive(>=) ist oder nicht (>). True für inklusive und False für exklusiv. Dies kann ein Diktat mit der Partitions-ID als Schlüssel und bool als Wert sein, der angibt, ob die starting_position für eine bestimmte Partition inklusive ist oder nicht. Dies kann auch ein einzelner Bool-Wert für alle starting_position sein. Der Standardwert ist False.
- on_error
- Callable[[PartitionContext, Exception]]
Die Rückruffunktion, die aufgerufen wird, wenn während des Empfangs ein Fehler ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind, oder während des Lastenausgleichs. Der Rückruf verwendet zwei Parameter: partition_context , der Partitionsinformationen enthält, und fehler als Ausnahme. partition_context kann Keine sein, wenn der Fehler während des Lastenausgleichs ausgelöst wird. Der Rückruf sollte wie folgt definiert werden: on_error(partition_context, Error). Der on_error Rückruf wird auch aufgerufen, wenn während des on_event Rückrufs eine ausnahme ausgelöst wird, die nicht behandelt wird.
- on_partition_initialize
- Callable[[PartitionContext]]
Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition die Initialisierung abgeschlossen hat. Sie wird auch aufgerufen, wenn ein neuer interner Partitionsconsumer erstellt wird, um den empfangenden Prozess für einen fehlerhaften und geschlossenen internen Partitionsconsumer zu übernehmen. Der Rückruf verwendet einen einzelnen Parameter: partition_context , der die Partitionsinformationen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition geschlossen wurde. Es wird auch aufgerufen, wenn der Fehler während des Empfangs ausgelöst wird, nachdem wiederholungsversuche aufgebraucht sind. Der Rückruf verwendet zwei Parameter: partition_context , der Partitionsinformationen und grund für das Schließen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_close(partition_context, Reason). Bitte beachten Sie für CloseReason die verschiedenen Abschlussgründe.
Rückgabetyp
Beispiele
Empfangen von Ereignissen vom EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
Empfangen von Ereignissen von Partitionen in Batches mit optionalem Lastenausgleich und Prüfpunkting.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
Parameter
- on_event_batch
- Callable[PartitionContext, List[EventData]]
Die Rückruffunktion zum Behandeln eines Batches empfangener Ereignisse. Der Rückruf verwendet zwei Parameter: partition_context , der den Partitionskontext enthält, und event_batch, bei dem es sich um die empfangenen Ereignisse handelt. Die Rückruffunktion sollte wie folgt definiert werden: on_event_batch(partition_context, event_batch). event_batch kann eine leere Liste sein, wenn max_wait_time weder None noch 0 ist und nach max_wait_time kein Ereignis empfangen wird. Ausführliche Informationen zum PartitionContextPartitionskontext finden Sie unter .
- max_batch_size
- int
Die maximale Anzahl von Ereignissen in einem Batch, die an den Rückruf übergeben on_event_batch. Wenn die tatsächliche empfangene Anzahl von Ereignissen größer als max_batch_size ist, werden die empfangenen Ereignisse in Batches unterteilt und rufen den Rückruf für jeden Batch mit bis zu max_batch_size Ereignissen auf.
- max_wait_time
- float
Das maximale Intervall in Sekunden, das der Ereignisprozessor wartet, bevor der Rückruf aufgerufen wird. Wenn innerhalb dieses Intervalls keine Ereignisse empfangen werden, wird der on_event_batch Rückruf mit einer leeren Liste aufgerufen. Wenn dieser Wert auf None oder 0 (Standard) festgelegt ist, wird der Rückruf erst aufgerufen, wenn Ereignisse empfangen werden.
- partition_id
- str
Wenn angegeben, empfängt der Client nur von dieser Partition. Andernfalls empfängt der Client von allen Partitionen.
- owner_level
- int
Die Priorität für einen exklusiven Verbraucher. Wenn owner_level festgelegt ist, wird ein exklusiver Consumer erstellt. Ein Consumer mit einem höheren owner_level hat eine höhere exklusive Priorität. Die Besitzerebene wird auch als "Epochenwert" des Consumers bezeichnet.
- prefetch
- int
Die Anzahl der Ereignisse, die vom Dienst zur Verarbeitung vorab abgerufen werden sollen. Der Standardwert ist 300.
- track_last_enqueued_event_properties
- bool
Gibt an, ob der Consumer Informationen zum Ereignis der letzten Warteschlange auf der zugeordneten Partition anfordern und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden. Wenn Informationen zu den Partitionen in der letzten Warteschlange nachverfolgt werden, enthält jedes ereignis, das vom Event Hubs-Dienst empfangen wird, Metadaten zur Partition. Dies führt zu einem geringen zusätzlichen Netzwerkbandbreitenverbrauch, der in der Regel ein günstiger Kompromiss ist, wenn er bei regelmäßigen Anforderungen für Partitionseigenschaften mithilfe des Event Hub-Clients berücksichtigt wird. Sie ist standardmäßig auf False festgelegt.
Beginnen Sie mit dem Empfang von dieser Ereignisposition, wenn keine Prüfpunktdaten für eine Partition vorhanden sind. Prüfpunktdaten werden verwendet, sofern verfügbar. Dies kann ein Diktat mit partitions-ID als Schlüssel und Position als Wert für einzelne Partitionen oder ein einzelner Wert für alle Partitionen sein. Der Werttyp kann str, int oder datetime.datetime sein. Außerdem werden die Werte "-1" für den Empfang vom Anfang des Datenstroms und "@latest" für den Empfang nur neuer Ereignisse unterstützt.
Bestimmen Sie, ob die angegebene starting_position inklusive(>=) ist oder nicht (>). True für inklusive und False für exklusiv. Dies kann ein Diktat mit der Partitions-ID als Schlüssel und bool als Wert sein, der angibt, ob die starting_position für eine bestimmte Partition inklusive ist oder nicht. Dies kann auch ein einzelner Bool-Wert für alle starting_position sein. Der Standardwert ist False.
- on_error
- Callable[[PartitionContext, Exception]]
Die Rückruffunktion, die aufgerufen wird, wenn während des Empfangs ein Fehler ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind, oder während des Lastenausgleichs. Der Rückruf verwendet zwei Parameter: partition_context , der Partitionsinformationen enthält, und fehler als Ausnahme. partition_context kann Keine sein, wenn der Fehler während des Lastenausgleichs ausgelöst wird. Der Rückruf sollte wie folgt definiert werden: on_error(partition_context, Error). Der on_error Rückruf wird auch aufgerufen, wenn während des on_event Rückrufs eine ausnahme ausgelöst wird, die nicht behandelt wird.
- on_partition_initialize
- Callable[[PartitionContext]]
Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition die Initialisierung abgeschlossen hat. Sie wird auch aufgerufen, wenn ein neuer interner Partitionsconsumer erstellt wird, um den empfangenden Prozess für einen fehlerhaften und geschlossenen internen Partitionsconsumer zu übernehmen. Der Rückruf verwendet einen einzelnen Parameter: partition_context , der die Partitionsinformationen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_initialize(partition_context).
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition geschlossen wurde. Es wird auch aufgerufen, wenn der Fehler während des Empfangs ausgelöst wird, nachdem wiederholungsversuche aufgebraucht sind. Der Rückruf verwendet zwei Parameter: partition_context , der Partitionsinformationen und grund für das Schließen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_close(partition_context, Reason). Bitte beachten Sie für CloseReason die verschiedenen Abschlussgründe.
Rückgabetyp
Beispiele
Empfangen von Ereignissen in Batches vom EventHub.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python