ForeachBatch gebruiken om naar willekeurige gegevenssinks te schrijven
In dit artikel wordt het gebruik foreachBatch
met Structured Streaming besproken om de uitvoer van een streamingquery te schrijven naar gegevensbronnen die geen bestaande streaming-sink hebben.
Met het codepatroon streamingDF.writeStream.foreachBatch(...)
kunt u batchfuncties toepassen op de uitvoergegevens van elke microbatch van de streamingquery. Functies die met foreachBatch
worden gebruikt, accepteren twee parameters.
- Een DataFrame met de uitvoergegevens van een microbatch.
- De unieke id van de microbatch.
U moet voor Delta Lake-samenvoegbewerkingen in Structured Streaming gebruiken foreachBatch
. Zie Upsert van streamingquery's met behulp van foreachBatch.
Aanvullende DataFrame-bewerkingen toepassen
Veel dataframe- en gegevenssetbewerkingen worden niet ondersteund in streaming DataFrames, omdat Spark in die gevallen geen ondersteuning biedt voor het genereren van incrementele plannen. Met behulp foreachBatch()
van kunt u een aantal van deze bewerkingen toepassen op elke microbatch-uitvoer. U kunt bijvoorbeeld foreachBath()
en de SQL MERGE INTO
-bewerking gebruiken om de uitvoer van streamingaggregaties naar een Delta-table te schrijven in update-modus. Zie meer informatie in MERGE INTO.
Belangrijk
-
foreachBatch()
biedt slechts ten minste eenmaal schrijfgaranties. U kunt echter debatchId
die aan de functie is verstrekt, gebruiken om de uitvoer te ontdubbelen en get een garantie van exact één keer. In beide gevallen moet u zelf redeneren over de end-to-end semantiek. -
foreachBatch()
werkt niet met de modus voor continue verwerking, omdat deze fundamenteel afhankelijk is van de microbatchuitvoering van een streamingquery. Als u gegevens schrijft in de doorlopende modus, gebruiktforeach()
u in plaats daarvan.
Een leeg dataframe kan worden aangeroepen met foreachBatch()
en gebruikerscode moet tolerant zijn om de juiste werking mogelijk te maken. Hier kunt u een voorbeeld bekijken:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Gedragswijzigingen voor foreachBatch
Databricks Runtime 14.0
In Databricks Runtime 14.0 en hoger voor berekeningen die zijn geconfigureerd met de modus voor gedeelde toegang, zijn de volgende gedragswijzigingen van toepassing:
-
print()
opdrachten schrijven uitvoer naar de stuurprogrammalogboeken. - U hebt geen toegang tot de
dbutils.widgets
submodule in de functie. - Bestanden, modules of objecten waarnaar in de functie wordt verwezen, moeten serialiseren en beschikbaar zijn in Spark.
Bestaande batchgegevensbronnen opnieuw gebruiken
Met behulp van foreachBatch()
kunt u bestaande batchgegevensschrijvers gebruiken voor gegevenssinks die mogelijk geen ondersteuning voor Structured Streaming hebben. Enkele voorbeelden:
Veel andere batchgegevensbronnen kunnen worden gebruikt uit foreachBatch()
. Zie Verbinding maken met gegevensbronnen.
Schrijven naar meerdere locaties
Als u de uitvoer van een streamingquery naar meerdere locaties moet schrijven, raadt Databricks aan om meerdere structured streaming-schrijvers te gebruiken voor de beste parallelle uitvoering en doorvoer.
Het gebruik foreachBatch
om naar meerdere sinks te schrijven serialiseert de uitvoering van streaming-schrijfbewerkingen, waardoor de latentie voor elke microbatch kan toenemen.
Als u foreachBatch
gebruikt om naar meerdere Delta-tableste schrijven, raadpleegt u Idempotent table schrijfbewerkingen in foreachBatch.