Uw eerste structured streaming-workload uitvoeren
Dit artikel bevat codevoorbeelden en uitleg van basisconcepten die nodig zijn om uw eerste Structured Streaming-query's uit te voeren op Azure Databricks. U kunt Structured Streaming gebruiken voor werkbelastingen in bijna realtime en incrementele verwerking.
Structured Streaming is een van de verschillende technologieën die stroomstreamingtabellen in Delta Live Tables mogelijk maken. Databricks raadt het gebruik van Delta Live Tables aan voor alle nieuwe ETL-, opname- en Structured Streaming-workloads. Zie Wat is Delta Live Tables?
Notitie
Hoewel Delta Live Tables een enigszins gewijzigde syntaxis biedt voor het declareren van streamingtabellen, is de algemene syntaxis voor het configureren van streaming-lees- en transformaties van toepassing op alle streaming-use cases in Azure Databricks. Delta Live Tables vereenvoudigt het streamen ook door statusinformatie, metagegevens en talloze configuraties te beheren.
Automatisch laden gebruiken om streaminggegevens uit objectopslag te lezen
In het volgende voorbeeld ziet u hoe u JSON-gegevens laadt met Auto Loader, die wordt gebruikt cloudFiles
om indeling en opties aan te geven. Met de schemaLocation
optie kunt u schemadeductie en evolutie inschakelen. Plak de volgende code in een Databricks-notebookcel en voer de cel uit om een streaming DataFrame met de naam raw_df
te maken:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Net als bij andere leesbewerkingen in Azure Databricks worden bij het configureren van een streaming-leesbewerking geen gegevens geladen. U moet een actie activeren voor de gegevens voordat de stream begint.
Notitie
Als u een streaming DataFrame aanroept display()
, wordt een streamingtaak gestart. Voor de meeste gebruiksscenario's voor gestructureerd streamen moet de actie die een stroom activeert gegevens naar een sink schrijven. Zie Overwegingen voor productie voor gestructureerd streamen.
Een streamingtransformatie uitvoeren
Structured Streaming ondersteunt de meeste transformaties die beschikbaar zijn in Azure Databricks en Spark SQL. U kunt zelfs MLflow-modellen als UDF's laden en streamingvoorspellingen doen als transformatie.
In het volgende codevoorbeeld wordt een eenvoudige transformatie voltooid om de opgenomen JSON-gegevens te verrijken met aanvullende informatie met behulp van Spark SQL-functies:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
Het resulterende transformed_df
bevat query-instructies voor het laden en transformeren van elke record wanneer deze in de gegevensbron binnenkomt.
Notitie
Structured Streaming behandelt gegevensbronnen als niet-gebonden of oneindige gegevenssets. Daarom worden sommige transformaties niet ondersteund in Structured Streaming-workloads, omdat hiervoor een oneindig aantal items moet worden gesorteerd.
Voor de meeste aggregaties en veel joins is het beheren van statusgegevens met watermerken, vensters en uitvoermodus vereist. Zie Watermerken toepassen om drempelwaarden voor gegevensverwerking te beheren.
Incrementele batch-schrijfbewerkingen uitvoeren naar Delta Lake
In het volgende voorbeeld wordt naar Delta Lake geschreven met behulp van een opgegeven bestandspad en controlepunt.
Belangrijk
Zorg er altijd voor dat u een unieke controlepuntlocatie opgeeft voor elke streamingschrijver die u configureert. Het controlepunt biedt de unieke identiteit voor uw stream, waarbij alle verwerkte records en statusgegevens worden bijgehouden die zijn gekoppeld aan uw streamingquery.
Met availableNow
de instelling voor de trigger wordt gestructureerd streamen geïnstrueerd om alle eerder niet-verwerkte records uit de brongegevensset te verwerken en vervolgens af te sluiten, zodat u de volgende code veilig kunt uitvoeren zonder dat u zich zorgen hoeft te maken over het verlaten van een stroom die wordt uitgevoerd:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
In dit voorbeeld komen er geen nieuwe records binnen in de gegevensbron. Herhaal de uitvoering van deze code neemt dus geen nieuwe records op.
Waarschuwing
Uitvoering van gestructureerd streamen kan voorkomen dat automatische beëindiging rekenresources afsluit. Als u onverwachte kosten wilt voorkomen, moet u streamingquery's beëindigen.
Gegevens lezen uit Delta Lake, transformeren en schrijven naar Delta Lake
Delta Lake biedt uitgebreide ondersteuning voor het werken met Structured Streaming als bron en een sink. Zie lees- en schrijfbewerkingen voor Delta-tabellen.
In het volgende voorbeeld ziet u een voorbeeldsyntaxis om alle nieuwe records uit een Delta-tabel incrementeel te laden, deze samen te voegen met een momentopname van een andere Delta-tabel en deze naar een Delta-tabel te schrijven:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
U moet over de juiste machtigingen beschikken voor het lezen van brontabellen en schrijven naar doeltabellen en de opgegeven controlepuntlocatie. Vul alle parameters in die worden aangeduid met punthaken (<>
) met behulp van de relevante waarden voor uw gegevensbronnen en sinks.
Notitie
Delta Live Tables biedt een volledig declaratieve syntaxis voor het maken van Delta Lake-pijplijnen en beheert automatisch eigenschappen zoals triggers en controlepunten. Zie Wat is Delta Live Tables?
Gegevens lezen uit Kafka, transformeren en schrijven naar Kafka
Apache Kafka en andere berichtenbussen bieden een aantal van de laagste latentie die beschikbaar is voor grote gegevenssets. U kunt Azure Databricks gebruiken om transformaties toe te passen op gegevens die zijn opgenomen vanuit Kafka en vervolgens gegevens terug te schrijven naar Kafka.
Notitie
Door gegevens naar cloudobjectopslag te schrijven, wordt extra latentieoverhead toegevoegd. Als u gegevens wilt opslaan uit een berichtenbus in Delta Lake, maar de laagste latentie nodig hebt voor streamingworkloads, raadt Databricks u aan afzonderlijke streamingtaken te configureren om gegevens op te nemen in lakehouse en bijna realtime transformaties toe te passen voor downstream-berichtenbus-sinks.
In het volgende codevoorbeeld ziet u een eenvoudig patroon om gegevens uit Kafka te verrijken door deze samen te voegen met gegevens in een Delta-tabel en vervolgens terug te schrijven naar Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
U moet over de juiste machtigingen beschikken voor toegang tot uw Kafka-service. Vul alle parameters in die worden aangeduid met punthaken (<>
) met behulp van de relevante waarden voor uw gegevensbronnen en sinks. Zie Stream-verwerking met Apache Kafka en Azure Databricks.