Delen via


ForeachBatch gebruiken om naar willekeurige gegevenssinks te schrijven

In dit artikel wordt het gebruik foreachBatch met Structured Streaming besproken om de uitvoer van een streamingquery te schrijven naar gegevensbronnen die geen bestaande streaming-sink hebben.

Met het codepatroon streamingDF.writeStream.foreachBatch(...) kunt u batchfuncties toepassen op de uitvoergegevens van elke microbatch van de streamingquery. Functies die worden gebruikt met foreachBatch twee parameters:

  • Een DataFrame met de uitvoergegevens van een microbatch.
  • De unieke id van de microbatch.

U moet voor Delta Lake-samenvoegbewerkingen in Structured Streaming gebruiken foreachBatch . Zie Upsert van streamingquery's met behulp van foreachBatch.

Aanvullende DataFrame-bewerkingen toepassen

Veel dataframe- en gegevenssetbewerkingen worden niet ondersteund in streaming DataFrames, omdat Spark in die gevallen geen ondersteuning biedt voor het genereren van incrementele plannen. Met behulp foreachBatch() van kunt u een aantal van deze bewerkingen toepassen op elke microbatch-uitvoer. U kunt bijvoorbeeld en de SQL-bewerking MERGE INTO gebruiken foreachBath() om de uitvoer van streamingaggregaties naar een Delta-tabel te schrijven in de updatemodus. Zie meer informatie in MERGE INTO.

Belangrijk

  • foreachBatch() biedt slechts ten minste eenmaal schrijfgaranties. U kunt de batchId opgegeven functie echter gebruiken als manier om de uitvoer te ontdubbelen en exact één keer een garantie te krijgen. In beide gevallen moet u zelf redeneren over de end-to-end semantiek.
  • foreachBatch() werkt niet met de modus voor continue verwerking, omdat deze fundamenteel afhankelijk is van de microbatchuitvoering van een streamingquery. Als u gegevens schrijft in de doorlopende modus, gebruikt foreach() u in plaats daarvan.

Een leeg dataframe kan worden aangeroepen met foreachBatch() en gebruikerscode moet tolerant zijn om de juiste werking mogelijk te maken. Hier kunt u een voorbeeld bekijken:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Gedragswijzigingen voor foreachBatch Databricks Runtime 14.0

In Databricks Runtime 14.0 en hoger voor berekeningen die zijn geconfigureerd met de modus voor gedeelde toegang, zijn de volgende gedragswijzigingen van toepassing:

  • print() opdrachten schrijven uitvoer naar de stuurprogrammalogboeken.
  • U hebt geen toegang tot de dbutils.widgets submodule in de functie.
  • Bestanden, modules of objecten waarnaar in de functie wordt verwezen, moeten serialiseren en beschikbaar zijn in Spark.

Bestaande batchgegevensbronnen opnieuw gebruiken

Met behulp van foreachBatch()kunt u bestaande batchgegevensschrijvers gebruiken voor gegevenssinks die mogelijk geen ondersteuning voor Structured Streaming hebben. Enkele voorbeelden:

Veel andere batchgegevensbronnen kunnen worden gebruikt uit foreachBatch(). Zie Verbinding maken met gegevensbronnen.

Schrijven naar meerdere locaties

Als u de uitvoer van een streamingquery naar meerdere locaties moet schrijven, raadt Databricks aan om meerdere structured streaming-schrijvers te gebruiken voor de beste parallelle uitvoering en doorvoer.

Het gebruik foreachBatch om naar meerdere sinks te schrijven serialiseert de uitvoering van streaming-schrijfbewerkingen, waardoor de latentie voor elke microbatch kan toenemen.

Als u wel foreachBatch naar meerdere Delta-tabellen schrijft, raadpleegt u Idempotent table writes in foreachBatch.