Condividi tramite


Usare foreachBatch per scrivere in sink di dati arbitrari

Questo articolo illustra l'uso di foreachBatch con Structured Streaming per scrivere l'output di una query di streaming in origini dati che non dispongono di un sink di streaming esistente.

Il modello streamingDF.writeStream.foreachBatch(...) di codice consente di applicare funzioni batch ai dati di output di ogni micro batch della query di streaming. Le funzioni usate con foreachBatch accettano due parametri:

  • DataFrame con i dati di output di un micro batch.
  • ID univoco del micro batch.

È necessario usare foreachBatch per le operazioni di merge Delta Lake in Structured Streaming. Vedere Upsert dalle query di streaming usando foreachBatch.

Applicare operazioni aggiuntive sul dataframe

Molte operazioni di dataframe e set di dati non sono supportate nei dataframe di streaming perché Spark non supporta la generazione di piani incrementali in questi casi. Usando foreachBatch() è possibile applicare alcune di queste operazioni a ogni output micro-batch. Ad esempio, è possibile usare foreachBatch() e l'operazione SQL MERGE INTO per scrivere l'output delle aggregazioni di streaming in una tabella Delta in modalità di aggiornamento. Per altri dettagli, vedere MERGE INTO.

Importante

  • foreachBatch() fornisce solo garanzie di scrittura at-least-once. Tuttavia, è possibile usare il batchId fornito alla funzione come modo per deduplicare l'output e ottenere una garanzia di esecuzione unica. In entrambi i casi, è necessario ragionare sulla semantica end-to-end manualmente.
  • foreachBatch() non funziona con la modalità di elaborazione continua perché si basa fondamentalmente sull'esecuzione micro batch di una query di streaming. Se si scrivono dati in modalità continua, usare foreach() invece .

È possibile richiamare un dataframe vuoto con foreachBatch() e il codice utente deve essere resiliente per consentire un funzionamento corretto. Di seguito è riportato un esempio:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Modifiche del comportamento per foreachBatch in Databricks Runtime 14.0

In Databricks Runtime 14.0 e versioni successive nel calcolo configurato con la modalità di accesso condiviso si applicano le modifiche di comportamento seguenti:

  • I comandi print() scrivono l'output nei log del driver.
  • Non è possibile accedere al modulo secondario dbutils.widgets all'interno della funzione.
  • Tutti i file, i moduli o gli oggetti a cui si fa riferimento nella funzione devono essere serializzabili e disponibili in Spark.

Riutilizzare le origini dati batch esistenti

Usando foreachBatch(), è possibile usare writer di dati batch esistenti per sink di dati che potrebbero non avere il supporto di Structured Streaming. Ecco alcuni esempi:

Molte altre origini dati batch possono essere usate da foreachBatch(). Vedere Connettersi alle origini dati.

Scrivere in più posizioni

Se è necessario scrivere l'output di una query di streaming in più posizioni, Databricks consiglia di usare più writer di streaming strutturato per ottimizzare la parallelizzazione e la velocità effettiva.

L'uso foreachBatch di per scrivere in più sink serializza l'esecuzione di scritture di streaming, che può aumentare la latenza per ogni micro batch.

Se si usa foreachBatch per scrivere in più tabelle Delta, vedere scritture di tabelle Idempotenti in foreachBatch.