Quickstart: Event Hubs-gegevens vastleggen in Azure Storage en lezen met behulp van Python (azure-eventhub)
U kunt een Event Hub zo configureren dat de gegevens die naar de Event Hub worden verzonden, worden vastgelegd in een Azure-opslagaccount of Azure Data Lake Storage Gen 1 of Gen 2. In dit artikel leest u hoe u Python-code kunt schrijven om gebeurtenissen te verzenden naar een Event Hub en de vastgelegde gegevens te lezen vanuit Azure Blob Storage. Zie Overzicht van de functie Capture van Event Hubs voor meer informatie over deze functie.
In deze handleiding wordt gebruikgemaakt van de Azure Python-SDK om de functie Capture te demonstreren. De app sender.py verzendt gesimuleerde omgevingstelemetrie naar Event Hubs in JSON-indeling. De Event Hub gebruikt de functie Capture om deze gegevens in batches naar Blob Storage te schrijven. De app capturereader.py leest deze blobs en maakt een toe te voegen bestand voor elk apparaat. De app schrijft de gegevens vervolgens in CSV-bestanden.
In deze snelstart, gaat u het volgende doen:
- Een Azure Blob-opslagaccount en container maken in Azure Portal.
- Een Event Hubs-naamruimte maken behulp van Azure Portal.
- Een Event Hub maken waarvoor de functie Capture is ingeschakeld en deze koppelen aan uw opslagaccount.
- Gegevens naar uw event hub verzenden met behulp van een Python-script.
- Bestanden uit Event Hubs Capture lezen en verwerken met behulp van een ander Python-script.
Vereisten
Python 3.8 of hoger, waarbij PIP is geïnstalleerd en bijgewerkt.
Een Azure-abonnement. Als u nog geen abonnement hebt, maakt u een gratis account voordat u begint.
Een actieve Azure Event Hubs-naamruimte en Event Hub. Maak een Event Hubs-naamruimte en een Event Hub. Noteer de naam van de Event Hubs-naamruimte, de naam van de Event Hub en de primaire toegangssleutel voor de naamruimte. Zie Een Event Hubs-verbindingsreeks ophalen om een toegangssleutel op te halen. De naam van de standaardsleutel is RootManageSharedAccessKey. Voor deze quickstart hebt u alleen de primaire sleutel nodig. De verbindingsreeks hebt u niet nodig.
Een Azure-opslagaccount, een blobcontainer in het opslagaccount en een verbindingsreeks naar het opslagaccount. Als u deze items niet hebt, voert u de volgende stappen uit:
- Een Azure-opslagaccount maken
- Een blobcontainer in het opslagaccount maken
- De verbindingsreeks voor het opslagaccount ophalen
Zorg ervoor dat u de verbindingsreeks en de containernaam vastlegt voor later gebruik in deze quickstart.
Capture-functie inschakelen voor de Event Hub
De functie Capture inschakelen voor de Event Hub. Volg hiervoor de instructies in Event Hubs Capture inschakelen met behulp van Azure Portal. Selecteer het opslagaccount en de blobcontainer die u in de vorige stap hebt gemaakt. Selecteer Avro voor de serialisatie-indeling van uitvoerevenementen.
Een Python-script maken om gebeurtenissen naar uw Event Hub te verzenden
In deze sectie maakt u een Python-script waarmee 200 gebeurtenissen (10 apparaten x 20 gebeurtenissen) naar een Event Hub worden verzonden. Deze gebeurtenissen zijn een sample van een omgevingsmeting die wordt verzonden in JSON-indeling.
Open uw favoriete Python-editor, bijvoorbeeld Visual Studio Code.
Maak een script met de naam sender.py.
Plak de volgende code in sender.py.
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = { 'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100) } s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()
Wijzig de volgende waarden in de scripts:
- Vervang
EVENT HUBS NAMESPACE CONNECTION STRING
door de verbindingsreeks voor uw Event Hubs-naamruimte. - Vervang
EVENT HUB NAME
door de naam van uw Event Hub.
- Vervang
Voer het script uit om gebeurtenissen naar de Event Hub te verzenden.
In de Azure-portal kunt u controleren of de Event Hub de berichten heeft ontvangen. Schakel over naar de weergave Berichten in de sectie Metrische gegevens. U moet de pagina vernieuwen om de grafiek bij te werken. Het kan een paar seconden duren voordat op de pagina wordt weergegeven dat de berichten zijn ontvangen.
Een Python-script maken om uw Capture-bestanden te lezen
In dit voorbeeld worden de vastgelegde gegevens opgeslagen in Azure Blob-opslag. Met het script in deze sectie worden de vastgelegde gegevensbestanden van uw Azure-opslagaccount gelezen en worden CSV-bestanden gegenereerd die u eenvoudig kunt openen en weergeven. U ziet 10 bestanden in de huidige werkmap van de toepassing. Deze bestanden bevatten de omgevingsmetingen voor de 10 apparaten.
Maak in uw Python-editor een script met de naam capturereader.py. Met dit script worden de vastgelegde bestanden gelezen en wordt voor elk apparaat een bestand gemaakt om uitsluitend de gegevens voor dat apparaat in te schrijven.
Plak de volgende code in capturereader.py.
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()
Vervang
AZURE STORAGE CONNECTION STRING
door de verbindingsreeks voor uw Azure-opslagaccount. De naam van de container die u in deze quickstart hebt gemaakt, is Capture. Als u een andere naam voor de container hebt gebruikt, vervangt u capture door de naam van de container in het opslagaccount.
De scripts uitvoeren
Open een opdrachtprompt met Python in het pad en voer vervolgens de volgende opdrachten uit om pakketten te installeren die voor Python vereist zijn:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3
Wijzig uw map in de map waarin u sender.py en capturereader.py hebt opgeslagen en voer de volgende opdracht uit:
python sender.py
Met deze opdracht wordt een nieuw Python-proces gestart om het verzendprogramma uit te voeren.
Wacht enkele minuten tot de Capture is uitgevoerd en voer vervolgens de volgende opdracht uit in uw oorspronkelijke opdrachtvenster:
python capturereader.py
Deze Capture-processor maakt gebruik van de lokale map om alle blobs uit het opslagaccount en de container in te downloaden. Het verwerkt bestanden die niet leeg zijn en schrijft de resultaten als CSV-bestanden naar de lokale map.
Volgende stappen
Bekijk Python-voorbeelden in GitHub.