Dela via


Använda foreachBatch för att skriva till godtyckliga datamottagare

Den här artikeln beskriver hur du använder foreachBatch structured streaming för att skriva utdata från en strömmande fråga till datakällor som inte har någon befintlig direktuppspelningsmottagare.

Med kodmönstret streamingDF.writeStream.foreachBatch(...) kan du använda batchfunktioner för utdata för varje mikrobatch i strömningsfrågan. Funktioner som används med foreachBatch tar två parameters:

  • En dataram som har utdata från en mikrobatch.
  • Mikrobatchens unika ID.

Du måste använda foreachBatch för Delta Lake-sammanslagningsåtgärder i Strukturerad direktuppspelning. Se Upsert från strömmande frågor med foreachBatch.

Tillämpa ytterligare DataFrame-åtgärder

Många DataFrame- och Dataset-åtgärder stöds inte i strömmande DataFrames eftersom Spark inte stöder generering av inkrementella planer i dessa fall. Med hjälp foreachBatch() av kan du tillämpa några av dessa åtgärder på varje mikrobatchutdata. Du kan till exempel använda foreachBath() och åtgärden SQL MERGE INTO för att skriva resultaten från strömmande aggregeringar till en Delta table i update-läge. Mer information finns i MERGE INTO.

Viktigt!

  • foreachBatch() ger endast skrivgarantier minst en gång. Du kan dock använda batchId, som tillhandahålls till funktionen, som ett sätt att deduplicera utdata och get för att garantera att varje operation utförs exakt en gång. I båda fallen måste du resonera om semantiken från slutpunkt till slutpunkt själv.
  • foreachBatch()fungerar inte med läget för kontinuerlig bearbetning eftersom det i grunden förlitar sig på mikrobatchkörningen av en strömmande fråga. Om du skriver data i kontinuerligt läge använder du foreach() i stället.

En tom dataram kan anropas med foreachBatch() och användarkoden måste vara elastisk för att möjliggöra korrekt åtgärd. Ett exempel på detta visas här:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Beteendeändringar för foreachBatch i Databricks Runtime 14.0

I Databricks Runtime 14.0 och senare vid beräkning som konfigurerats med läget för delad åtkomst gäller följande beteendeändringar:

  • print() kommandon skriver utdata till drivrutinsloggarna.
  • Du kan inte komma åt undermodulen dbutils.widgets i funktionen.
  • Alla filer, moduler eller objekt som refereras till i funktionen måste vara serialiserbara och tillgängliga på Spark.

Återanvända befintliga batchdatakällor

Med kan foreachBatch()du använda befintliga batchdataskrivare för datamottagare som kanske inte har stöd för structured streaming. Några exempel:

Många andra batchdatakällor kan användas från foreachBatch(). Se Ansluta till datakällor.

Skriva till flera platser

Om du behöver skriva utdata från en strömmande fråga till flera platser rekommenderar Databricks att du använder flera strukturerade strömningsskrivare för bästa parallellisering och dataflöde.

Om du använder foreachBatch för att skriva till flera mottagare serialiseras körningen av strömmande skrivningar, vilket kan öka svarstiden för varje mikrobatch.

Om du använder foreachBatch för att skriva till flera Delta-tables, se Idempotent table skrivningar i foreachBatch.