Bibliothèque cliente du Magasin de points de contrôle Azure EventHubs pour Python - version 1.1.4
utilisation d’objets blob de stockage
Le magasin de points de contrôle Azure EventHubs est utilisé pour stocker des points de contrôle lors du traitement des événements de Azure Event Hubs.
Ce package Checkpoint Store fonctionne comme un package de plug-in pour EventHubConsumerClient
. Il utilise Stockage Blob Azure comme magasin persistant pour la gestion des points de contrôle et des informations de propriété de partition.
Notez qu’il s’agit d’une bibliothèque asynchrone, pour la version de synchronisation de la bibliothèque cliente Azure EventHubs Checkpoint Store, reportez-vous à azure-eventhub-checkpointstoreblob.
| Code sourcePackage (PyPi) | Documentation de référence sur les | API Documentation | Azure EventhubsDocumentation stockage Azure
Prise en main
Prérequis
Python 3.6 ou version ultérieure.
Abonnement Microsoft Azure : Pour utiliser les services Azure, y compris Azure Event Hubs, vous avez besoin d’un abonnement. Si vous n’avez pas de compte Azure existant, vous pouvez vous inscrire à un essai gratuit ou utiliser les avantages de votre abonné MSDN lorsque vous créez un compte.
Espace de noms Event Hubs avec un hub d’événements : Pour interagir avec Azure Event Hubs, vous devez également disposer d’un espace de noms et d’event Hub. Si vous n’êtes pas familiarisé avec la création de ressources Azure, vous pouvez suivre le guide pas à pas pour créer un Event Hub à l’aide du Portail Azure. Vous y trouverez également des instructions détaillées sur l’utilisation des modèles Azure CLI, Azure PowerShell ou Azure Resource Manager (ARM) pour créer un hub d’événements.
Compte stockage Azure : Vous devez disposer d’un compte stockage Azure et créer un conteneur de blocs Stockage Blob Azure pour stocker les données de point de contrôle avec des objets blob. Vous pouvez suivre le guide de création d’un compte de stockage d’objets blob De blocs Azure.
Installer le package
$ pip install azure-eventhub-checkpointstoreblob-aio
Concepts clés
Points de contrôle
Les points de contrôle constituent un processus par lequel les lecteurs marquent ou valident leur position dans une séquence d’événements de partition. La réalisation des points de contrôle est la responsabilité du consommateur et se produit sur une base par partition dans un groupe de consommateurs. Cette responsabilité signifie que pour chaque groupe de consommateurs, chaque lecteur de partition doit conserver une trace de sa position actuelle dans le flux d’événements. Il peut informer le service lorsqu’il considère que le flux de données est complet. Si un lecteur se déconnecte d'une partition, lorsqu'il se reconnecte il commence la lecture au point de contrôle qui a été précédemment soumis par le dernier lecteur de cette partition dans ce groupe de consommateurs. Lorsque le lecteur se connecte, il transmet le décalage à l’Event Hub pour spécifier l’emplacement où commencer la lecture. De cette façon, vous pouvez utiliser les points de contrôle pour marquer les événements comme « terminés » par les applications en aval et pour assurer la résilience si un basculement se produit entre des lecteurs en cours d’exécution sur des ordinateurs différents. Il est possible de revenir à des données plus anciennes en spécifiant un décalage inférieur à partir de ce processus de vérification. Grâce à ce mécanisme, les points de contrôle permettent une résilience au basculement renforcée, mais également la relecture du flux d’événements.
Décalages des nombres de séquences &
Les deux numéros de séquence de décalage & font référence à la position d’un événement dans une partition. Vous pouvez les considérer comme un curseur côté client. Le décalage est une numérotation en octets de l'événement. Le décalage/numéro de séquence permet à un consommateur d’événements (lecteur) de spécifier un point dans le flux d’événements à partir duquel il souhaite commencer la lecture des événements. Vous pouvez spécifier un horodatage de sorte que vous receviez les événements en file d’attente uniquement après l’horodatage donné. Les consommateurs ont la responsabilité de stocker leurs propres valeurs de décalage en dehors du service des hubs d'événements. Dans une partition, chaque événement comprend un décalage, un numéro de séquence et l’horodatage du moment où il a été mis en file d’attente.
Exemples
- Créer un EventHubs Azure
EventHubConsumerClient
- Consommer des événements à l’aide d’un
BlobCheckpointStore
Créer un EventHubConsumerClient
Le moyen le plus simple de créer un EventHubConsumerClient
consiste à utiliser une chaîne de connexion.
from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
Pour obtenir d’autres façons de créer un EventHubConsumerClient
, reportez-vous à la bibliothèque EventHubs pour plus d’informations.
Consommer des événements à l’aide d’un BlobCheckpointStore
point de contrôle pour faire
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
async def on_event(partition_context, event):
# Put your code here.
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
async with client:
await client.receive(on_event)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Utiliser BlobCheckpointStore
avec une autre version de l’API service de stockage Azure
Certains environnements ont des versions différentes de l’API Service stockage Azure.
BlobCheckpointStore
par défaut, utilise l’API service de stockage version 2019-07-07. Pour l’utiliser sur une autre version, spécifiez api_version
quand vous créez l’objet BlobCheckpointStore
.
Dépannage
Général
L’activation de la journalisation sera utile pour résoudre les problèmes.
Journalisation
- Activez
azure.eventhub.extensions.checkpointstoreblobaio
l’enregistreur d’événements pour collecter des traces à partir de la bibliothèque. - Activez
azure.eventhub
l’enregistreur d’événements pour collecter des traces à partir de la bibliothèque azure-eventhub principale. - Activez
azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage
l’enregistreur d’événements pour collecter des traces à partir de la bibliothèque d’objets blob de stockage Azure. - Activez
uamqp
l’enregistreur d’événements pour collecter des traces à partir de la bibliothèque uAMQP sous-jacente. - Activez la trace au niveau de l’image AMQP en définissant
logging_enable=True
lors de la création du client.
Étapes suivantes
Autres exemples de code
Prise en main de nos exemples asynchrones de magasin de points de contrôle EventHubs.
- receive_events_using_checkpoint_store_async.py - Exemple de magasin de points de contrôle d’objet blob EventHubConsumerClient
- receive_events_using_checkpoint_store_storage_api_version_async.py - EventHubConsumerClient avec exemple de magasin de points de contrôle d’objets blob et de version de stockage
Documentation
La documentation de référence est disponible ici.
Fournir des commentaires
Si vous rencontrez des bogues ou si vous avez des suggestions, signalez un problème dans la section Problèmes du projet.
Contribution
Ce projet accepte les contributions et les suggestions. La plupart des contributions vous demandent d’accepter un contrat de licence de contribution (CLA) déclarant que vous avez le droit de nous accorder, et que vous nous accordez réellement, les droits d’utilisation de votre contribution. Pour plus d’informations, visitez https://cla.microsoft.com.
Quand vous envoyez une demande de tirage (pull request), un bot CLA détermine automatiquement si vous devez fournir un contrat CLA et agrémenter la demande de tirage de façon appropriée (par exemple, avec une étiquette ou un commentaire). Suivez simplement les instructions fournies par le bot. Vous ne devez effectuer cette opération qu’une seule fois sur tous les dépôts utilisant notre contrat CLA.
Ce projet a adopté le Code de conduite Open Source de Microsoft. Pour plus d’informations, consultez les Questions fréquentes (FAQ) sur le code de conduite ou envoyez vos questions ou vos commentaires à opencode@microsoft.com.
Azure SDK for Python