Partager via


bibliothèque cliente Azure Event Hubs pour Python - version 5.11.5

Azure Event Hubs est un service de publication-abonnement hautement évolutif qui peut ingérer des millions d’événements par seconde et les diffuser en continu vers plusieurs consommateurs. Cela vous permet de traiter et d’analyser les énormes quantités de données produites par vos appareils et applications connectés. Une fois qu’Event Hubs a collecté les données, vous pouvez les récupérer, les transformer et les stocker à l’aide de n’importe quel fournisseur d’analyse en temps réel ou avec des adaptateurs de traitement par lots/stockage. Si vous souhaitez en savoir plus sur Azure Event Hubs, vous pouvez consulter : Qu’est-ce qu’Event Hubs ?

La bibliothèque de client Azure Event Hubs permet de publier et de consommer des événements Azure Event Hubs et peut être utilisée pour :

  • Émettre des données de télémétrie sur votre application à des fins décisionnelles et de diagnostic.
  • Publier des faits à propos de l’état de votre application que les parties intéressées peuvent observer et utiliser comme déclencheur d’une action.
  • Observer les opérations et les interactions intéressantes qui se produisent au sein de votre entreprise ou d’un autre écosystème, ce qui permet aux systèmes faiblement couplés d’interagir sans qu’il soit nécessaire de les lier ensemble.
  • Recevoir les événements d’un ou de plusieurs serveurs de publication, les transformer pour qu’ils répondent mieux aux besoins de votre écosystème, puis publier les événements transformés dans un nouveau flux à l’attention des consommateurs.

| Code sourcePackage (PyPi) | Package (Conda) | Documentation de référence sur les | API | Documentation produitÉchantillons

Prise en main

Prérequis

  • Python 3.7 ou version ultérieure.

  • Abonnement Microsoft Azure : Pour utiliser les services Azure, y compris Azure Event Hubs, vous avez besoin d’un abonnement. Si vous n’avez pas de compte Azure existant, vous pouvez vous inscrire à un essai gratuit ou utiliser les avantages de votre abonné MSDN lorsque vous créez un compte.

  • Espace de noms Event Hubs avec un hub d’événements : Pour interagir avec Azure Event Hubs, vous devez également disposer d’un espace de noms et d’event Hub. Si vous n’êtes pas familiarisé avec la création de ressources Azure, vous pouvez suivre le guide pas à pas pour créer un Event Hub à l’aide du Portail Azure. Vous y trouverez également des instructions détaillées sur l’utilisation des modèles Azure CLI, Azure PowerShell ou Azure Resource Manager (ARM) pour créer un hub d’événements.

Installer le package

Installez la bibliothèque cliente Azure Event Hubs pour Python avec pip :

$ pip install azure-eventhub

Authentifier le client

L’interaction avec Event Hubs commence par une instance de la classe EventHubConsumerClient ou EventHubProducerClient. Vous avez besoin du nom d’hôte, des informations d’identification SAS/AAD et du nom du hub d’événements ou d’un chaîne de connexion pour instancier l’objet client.

Créez un client à partir de chaîne de connexion :

Pour que la bibliothèque cliente Event Hubs interagit avec un hub d’événements, le moyen le plus simple consiste à utiliser un chaîne de connexion, qui est créé automatiquement lors de la création d’un espace de noms Event Hubs. Si vous n’êtes pas familiarisé avec les stratégies d’accès partagé dans Azure, vous pouvez suivre le guide pas à pas pour obtenir un chaîne de connexion Event Hubs.

  • La from_connection_string méthode prend le chaîne de connexion du formulaire Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> et du nom d’entité à votre instance Event Hub. Vous pouvez obtenir cette chaîne de connexion sur le portail Azure.

Créez un client à l’aide de la bibliothèque azure-identity :

Vous pouvez également utiliser un objet Credential pour vous authentifier via AAD avec le package azure-identity.

  • Ce constructeur illustré dans l’exemple lié ci-dessus prend le nom d’hôte et le nom d’entité de votre Event Hub instance et les informations d’identification qui implémentent le protocole TokenCredential. Des implémentations du TokenCredential protocole sont disponibles dans le package azure-identity. Le nom d’hôte est au format <yournamespace.servicebus.windows.net>.
  • Pour utiliser les types d’informations d’identification fournis par azure-identity, installez le package : pip install azure-identity
  • En outre, pour utiliser l’API asynchrone, vous devez d’abord installer un transport asynchrone, tel que aiohttp: pip install aiohttp
  • Lorsque vous utilisez Azure Active Directory, un rôle qui autorise l’accès à Event Hubs doit être attribué à votre principal, tel que le rôle propriétaire des données Azure Event Hubs. Pour plus d’informations sur l’utilisation de l’autorisation Azure Active Directory avec Event Hubs, consultez la documentation associée.

Concepts clés

  • Un EventHubProducerClient est une source de données de télémétrie, d’informations diagnostics, de journaux d’utilisation ou d’autres données de journal, dans le cadre d’une solution d’appareil incorporée, d’une application d’appareil mobile, d’un titre de jeu s’exécutant sur une console ou un autre appareil, d’une solution métier basée sur un client ou un serveur, ou d’un site web.

  • Un EventHubConsumerClient récupère ces informations à partir d’Event Hub et les traite. Le traitement peut impliquer l’agrégation, le calcul complexe et le filtrage. ou encore distribution ou stockage des informations brutes ou transformées. Les consommateurs Event Hub correspondent souvent à des composants d’infrastructure de plateforme robustes à grande échelle intégrant des fonctionnalités d’analytique, par exemple Azure Stream Analytics, Apache Spark ou Apache Storm.

  • Une partition constitue une séquence ordonnée d’événements conservée dans un hub d’événements. Azure Event Hubs assure la diffusion de messages suivant un modèle de consommateur partitionné dans lequel chaque consommateur ne lit qu’un sous-ensemble spécifique, ou partition, du flux de message. Les événements les plus récents sont ajoutés à la fin de cette séquence. Le nombre de partitions est spécifié lors de la création du hub d’événements. Il n’est pas modifiable.

  • Un groupe de consommateurs constitue une vue de tout un hub d’événements. Les groupes de consommateurs permettent à plusieurs applications consommatrices de disposer chacune d’une vue distincte du flux d’événements, et de lire le flux séparément, à son propre rythme et à partir de sa propre position. Il peut y avoir au maximum cinq lecteurs simultanés sur une partition par groupe de consommateurs. Toutefois, il est recommandé de se limiter à un seul consommateur actif pour une association donnée entre une partition et un groupe de consommateurs. Chaque lecteur actif reçoit tous les événements de sa partition. Si plusieurs lecteurs se trouvent sur la même partition, ils reçoivent des événements en double.

Pour plus de concepts et une discussion plus approfondie, consultez Fonctionnalités Event Hubs. En outre, les concepts d’AMQP sont bien documentés dans OASIS Advanced Messaging Queuing Protocol (AMQP) version 1.0.

Sécurité des threads

Nous ne garantissons pas que eventHubProducerClient ou EventHubConsumerClient sont thread-safe. Nous vous déconseillons de réutiliser ces instances entre les threads. Il incombe à l’application en cours d’exécution d’utiliser ces classes de manière thread-safe.

Le type de modèle de données n’est EventDataBatch pas thread-safe. Il ne doit pas être partagé entre les threads ni utilisé simultanément avec les méthodes clientes.

Exemples

Les sections suivantes fournissent plusieurs extraits de code couvrant certaines des tâches Event Hubs les plus courantes, notamment :

Inspection d’un hub d’événements

Obtenez les ID de partition d’un Event Hub.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Publication d’événements sur un hub d’événements

Utilisez la create_batch méthode on EventHubProducerClient pour créer un EventDataBatch objet qui peut ensuite être envoyé à l’aide de la send_batch méthode . Des événements peuvent être ajoutés à l’aide de EventDataBatch la add méthode jusqu’à ce que la limite de taille de lot maximale en octets ait été atteinte.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Utiliser des événements à partir d’un hub d’événements

Il existe plusieurs façons d’utiliser des événements à partir d’un EventHub. Pour déclencher simplement un rappel lorsqu’un événement est reçu, la EventHubConsumerClient.receive méthode sera utilisée comme suit :

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Consommer des événements à partir d’un hub d’événements dans des lots

Alors que l’exemple ci-dessus déclenche le rappel pour chaque message à mesure qu’il est reçu, l’exemple suivant déclenche le rappel sur un lot d’événements, tentant de recevoir un numéro à la fois.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Publier des événements sur un hub d’événements de manière asynchrone

Utilisez la create_batch méthode on EventHubProducer pour créer un EventDataBatch objet qui peut ensuite être envoyé à l’aide de la send_batch méthode . Des événements peuvent être ajoutés à l’aide de EventDataBatch la add méthode jusqu’à ce que la limite de taille de lot maximale en octets ait été atteinte.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Consommer des événements à partir d’un hub d’événements de manière asynchrone

Ce KIT de développement logiciel (SDK) prend en charge le code synchrone et le code asynchrone. Pour recevoir comme indiqué dans les exemples ci-dessus, mais dans aio, il faut les éléments suivants :

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Consommer des événements à partir d’un hub d’événements dans des lots de manière asynchrone

Toutes les fonctions synchrones sont également prises en charge dans aio. Comme indiqué ci-dessus pour la réception de lot synchrone, vous pouvez effectuer la même opération dans asyncio comme suit :

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Consommer des événements et enregistrer des points de contrôle à l’aide d’un magasin de points de contrôle

EventHubConsumerClient est une construction de haut niveau qui vous permet de recevoir des événements de plusieurs partitions à la fois et d’équilibrer la charge avec d’autres consommateurs à l’aide du même hub d’événements et du même groupe de consommateurs.

Cela permet également à l’utilisateur de suivre la progression lorsque les événements sont traités à l’aide de points de contrôle.

Un point de contrôle est destiné à représenter le dernier événement traité avec succès par l’utilisateur à partir d’une partition particulière d’un groupe de consommateurs dans un instance Event Hub. utilise EventHubConsumerClient une instance de pour mettre à jour les points de CheckpointStore contrôle et stocker les informations pertinentes requises par l’algorithme d’équilibrage de charge.

Recherchez pypi avec le préfixe azure-eventhub-checkpointstore pour rechercher les packages qui prennent en charge ce paramètre et utilisez l’implémentation CheckpointStore de l’un de ces packages. Notez que les bibliothèques de synchronisation et de synchronisation asynchrone sont fournies.

Dans l’exemple ci-dessous, nous créons un instance de EventHubConsumerClient et utilisons un BlobCheckpointStore. Vous devez créer un compte stockage Azure et un conteneur d’objets blob pour exécuter le code.

Stockage Blob Azure Checkpoint Store Async et Stockage Blob Azure Checkpoint Store Sync sont l’une CheckpointStore des implémentations que nous fournissons qui s’applique Stockage Blob Azure en tant que magasin persistant.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Utilisez EventHubConsumerClient pour utiliser IoT Hub

Vous pouvez également utiliser EventHubConsumerClient pour utiliser IoT Hub. Cela est utile pour recevoir des données de télémétrie de IoT Hub à partir de l’EventHub lié. Le chaîne de connexion associé n’aura pas de revendications d’envoi. Par conséquent, l’envoi d’événements n’est pas possible.

Notez que le chaîne de connexion doit être destiné à un point de terminaison compatible avec Event Hub, par exemple« Endpoint=sb ://my-iothub-namespace-[uid].servicebus.windows.net/ ; SharedAccessKeyName=my-SA-name ; SharedAccessKey=my-SA-key ; EntityPath=my-iot-hub-name »

Il existe deux façons d’obtenir le point de terminaison compatible Event Hubs :

  • Obtenez manuellement les « points de terminaison intégrés » du IoT Hub dans le portail Azure et recevez-en.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Dépannage

Consultez le azure-eventhubsguide de résolution des problèmes pour plus d’informations sur la façon de diagnostiquer différents scénarios d’échec.

Étapes suivantes

Autres exemples de code

Consultez le répertoire d’exemples pour obtenir des exemples détaillés sur l’utilisation de cette bibliothèque pour envoyer et recevoir des événements vers/à partir d’Event Hubs.

Documentation

La documentation de référence est disponible ici.

Registre de schémas et Avro Encoder

Le Kit de développement logiciel (SDK) EventHubs s’intègre parfaitement avec le service Registre de schémas et Avro. Pour plus d’informations, consultez le Kit de développement logiciel (SDK) du Registre de schémas et le Kit de développement logiciel (SDK) Avro Encoder du Registre de schémas.

Prise en charge de la compatibilité descendante et du transport AMQP Pure Python

La bibliothèque cliente Azure Event Hubs est désormais basée sur une implémentation pure de Python AMQP. uAMQP a été supprimé en tant que dépendance requise.

À utiliser uAMQP comme transport sous-jacent :

  1. Installez uamqp avec pip.
$ pip install uamqp 
  1. Passer pendant la uamqp_transport=True construction du client.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Remarque : l’attribut message surEventDataBatchEventData/ , qui exposait précédemment le uamqp.Message, a été déprécié. Les objets « hérités » retournés par EventData.message/EventDataBatch.message ont été introduits pour faciliter la transition.

Génération d’une roue uAMQP à partir de la source

Si uAMQP est destiné à être utilisé comme implémentation du protocole AMQP sous-jacent pour azure-eventhub, les roues uAMQP sont disponibles pour la plupart des principaux systèmes d’exploitation.

Si vous envisagez d’utiliser uAMQP et que vous exécutez sur une plateforme pour laquelle les roues uAMQP ne sont pas fournies, suivez les instructions d’installation d’uAMQP pour installer à partir de la source.

Fournir des commentaires

Si vous rencontrez des bogues ou si vous avez des suggestions, signalez un problème dans la section Problèmes du projet.

Contribution

Ce projet accepte les contributions et les suggestions. La plupart des contributions vous demandent d’accepter un contrat de licence de contribution (CLA) déclarant que vous avez le droit de nous accorder, et que vous nous accordez réellement, les droits d’utilisation de votre contribution. Pour plus d’informations, visitez https://cla.microsoft.com.

Quand vous envoyez une demande de tirage (pull request), un bot CLA détermine automatiquement si vous devez fournir un contrat CLA et agrémenter la demande de tirage de façon appropriée (par exemple, avec une étiquette ou un commentaire). Suivez simplement les instructions fournies par le bot. Vous ne devez effectuer cette opération qu’une seule fois sur tous les dépôts utilisant notre contrat CLA.

Ce projet a adopté le Code de conduite Open Source de Microsoft. Pour plus d’informations, consultez les Questions fréquentes (FAQ) sur le code de conduite ou envoyez vos questions ou vos commentaires à opencode@microsoft.com.

Impressions