Libreria client dell'archivio checkpoint di Azure EventHubs per Python - versione 1.1.4
uso dei BLOB di archiviazione
L'archivio checkpoint di Azure EventHubs viene usato per archiviare checkpoint durante l'elaborazione di eventi da Hub eventi di Azure.
Questo pacchetto dell'archivio checkpoint funziona come pacchetto plug-in per EventHubConsumerClient
. Usa IL BLOB di Archiviazione di Azure come archivio permanente per la gestione di checkpoint e informazioni sulla proprietà della partizione.
Si noti che si tratta di una libreria asincrona, per la versione di sincronizzazione della libreria client dell'archivio checkpoint di Azure EventHubs, vedere azure-eventhub-checkpointstoreblob.
Codice | sorgente Pacchetto (PyPi) | Documentazione | di riferimento sulle API Documentazione | di Azure EventHubsDocumentazione di Archiviazione di Azure
Introduzione
Prerequisiti
Python 3.6 o versione successiva.
Sottoscrizione di Microsoft Azure: Per usare i servizi di Azure, inclusi i Hub eventi di Azure, è necessaria una sottoscrizione. Se non si ha un account Azure esistente, è possibile iscriversi per ottenere una versione di valutazione gratuita o usare i vantaggi del sottoscrittore MSDN quando si crea un account.
Spazio dei nomi di Hub eventi con un hub eventi: Per interagire con Hub eventi di Azure, è anche necessario avere uno spazio dei nomi e un hub eventi disponibili. Se non si ha familiarità con la creazione di risorse di Azure, è possibile seguire la guida dettagliata per la creazione di un hub eventi usando il portale di Azure. È anche possibile trovare istruzioni dettagliate per l'uso dell'interfaccia della riga di comando di Azure, dei Azure PowerShell o dei modelli di Azure Resource Manager (ARM) per creare un hub eventi.
Account di archiviazione di Azure: È necessario avere un account di archiviazione di Azure e creare un contenitore Archiviazione BLOB di Azure Blocca per archiviare i dati del checkpoint con i BLOB. È possibile seguire la guida alla creazione di un account di archiviazione BLOB in blocchi di Azure.
Installare il pacchetto
$ pip install azure-eventhub-checkpointstoreblob-aio
Concetti chiave
Checkpoint
Checkpoint è un processo mediante il quale i lettori contrassegnano o eseguono il commit della propria posizione all'interno di una sequenza di eventi di partizione. Il checkpoint è responsabilità del consumer e si verifica per partizione all'interno di un gruppo di consumer. Questa responsabilità significa che per ogni gruppo di consumer, ogni lettore di partizione deve tenere traccia della posizione corrente nel flusso di eventi e può informare il servizio quando considera completo il flusso di dati. Se un lettore si disconnette da una partizione, quando riconnette inizia a leggere in corrispondenza del checkpoint inviato in precedenza dall’ulitimo lettore di tale partizione in tale gruppo di consumer. Quando il lettore si connette, passa l'offset all'hub eventi per specificare la posizione da cui iniziare la lettura. In questo modo è possibile usare la funzionalità di checkpoint sia per contrassegnare gli eventi come "completi" dalle applicazioni a valle sia per fornire la resilienza in caso di failover tra i lettori in esecuzione in computer diversi. È possibile tornare a dati precedenti specificando un offset inferiore da questo processo di checkpoint. Tramite questo meccanismo il checkpoint consente sia la resilienza del failover che la riproduzione del flusso di eventi.
Offset dei numeri di & sequenza
Entrambi i numeri di sequenza offset & fanno riferimento alla posizione di un evento all'interno di una partizione. È possibile considerarli come un cursore sul lato client. L'offset è la numerazione di byte dell'evento. Il numero di offset/sequenza consente a un consumer di eventi (lettore) di specificare un punto nel flusso di eventi da cui vogliono iniziare a leggere gli eventi. È possibile specificare un timestamp in modo da ricevere eventi accodati solo dopo il timestamp specificato. I consumer sono responsabili di archiviare i propri valori di offset all'esterno del servizio Hub eventi. All'interno di una partizione, ogni evento include un offset, un numero di sequenza e il timestamp di quando è stato accodato.
Esempio
- Creare un hub eventi di Azure
EventHubConsumerClient
- Utilizzare gli eventi tramite un
BlobCheckpointStore
Creare una chiave EventHubConsumerClient
Il modo più semplice per creare un EventHubConsumerClient
oggetto consiste nell'usare una stringa di connessione.
from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")
Per altri modi di creare un EventHubConsumerClient
oggetto , fare riferimento alla libreria EventHubs per altri dettagli.
Utilizzare gli eventi usando un BlobCheckpointStore
oggetto per eseguire il checkpoint
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'
async def on_event(partition_context, event):
# Put your code here.
await partition_context.update_checkpoint(event) # Or update_checkpoint every N events for better performance.
async def main():
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name
)
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store,
)
async with client:
await client.receive(on_event)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Usare BlobCheckpointStore
con una versione diversa dell'API del servizio di archiviazione di Azure
Alcuni ambienti hanno versioni diverse dell'API del servizio di archiviazione di Azure.
BlobCheckpointStore
per impostazione predefinita usa l'API del servizio di archiviazione versione 2019-07-07. Per usarlo in base a una versione diversa, specificare api_version
quando si crea l'oggetto BlobCheckpointStore
.
Risoluzione dei problemi
Generale
L'abilitazione della registrazione sarà utile per risolvere i problemi.
Registrazione
- Abilitare
azure.eventhub.extensions.checkpointstoreblobaio
il logger per raccogliere tracce dalla libreria. - Abilitare
azure.eventhub
il logger per raccogliere tracce dalla libreria azure-eventhub principale. - Abilitare
azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage
il logger per raccogliere tracce dalla libreria BLOB di archiviazione di Azure. - Abilitare
uamqp
il logger per raccogliere tracce dalla libreria uAMQP sottostante. - Abilitare la traccia a livello di frame AMQP impostando
logging_enable=True
durante la creazione del client.
Passaggi successivi
Altro codice di esempio
Introduzione agli esempi asincroni dell'archivio checkpoint di EventHubs.
- receive_events_using_checkpoint_store_async.py - Esempio di eventHubConsumerClient con archivio checkpoint BLOB
- receive_events_using_checkpoint_store_storage_api_version_async.py - EventHubConsumerClient con l'archivio blob e l'esempio di versione di archiviazione
Documentazione
La documentazione di riferimento è disponibile qui.
Commenti e suggerimenti
Se si verificano bug o si hanno suggerimenti, segnalare un problema nella sezione Problemi del progetto.
Contributo
In questo progetto sono benvenuti i contributi e i suggerimenti. Per la maggior parte dei contenuti è necessario sottoscrivere un contratto di licenza di collaborazione (CLA, Contributor License Agreement) che stabilisce che l'utente ha il diritto di concedere, e di fatto concede a Microsoft i diritti d'uso del suo contributo. Per informazioni dettagliate, vedere https://cla.microsoft.com.
Quando si invia una richiesta pull, un bot CLA determina automaticamente se è necessario specificare un contratto CLA e completare la richiesta pull in modo appropriato (ad esempio con un'etichetta e un commento). Seguire le istruzioni specificate dal bot. È sufficiente eseguire questa operazione una sola volta per tutti i repository che usano il contratto CLA Microsoft.
Questo progetto ha adottato il Codice di comportamento di Microsoft per l'open source. Per altre informazioni, vedere Code of Conduct FAQ (Domande frequenti sul Codice di comportamento Open Source di Microsoft) oppure contattare opencode@microsoft.com per eventuali altre domande o commenti.
Azure SDK for Python