Usare foreachBatch per scrivere in sink di dati arbitrari
Questo articolo illustra l'uso di foreachBatch
con Structured Streaming per scrivere l'output di una query di streaming in origini dati che non dispongono di un sink di streaming esistente.
Il modello streamingDF.writeStream.foreachBatch(...)
di codice consente di applicare funzioni batch ai dati di output di ogni micro batch della query di streaming. Le funzioni usate con foreachBatch
accettano due parametri:
- DataFrame con i dati di output di un micro batch.
- ID univoco del micro batch.
È necessario usare foreachBatch
per le operazioni di merge Delta Lake in Structured Streaming. Vedere Upsert dalle query di streaming usando foreachBatch.
Applicare operazioni aggiuntive sul dataframe
Molte operazioni di dataframe e set di dati non sono supportate nei dataframe di streaming perché Spark non supporta la generazione di piani incrementali in questi casi. Usando foreachBatch()
è possibile applicare alcune di queste operazioni a ogni output micro-batch. Ad esempio, è possibile usare foreachBatch()
e l'operazione SQL MERGE INTO
per scrivere l'output delle aggregazioni di streaming in una tabella Delta in modalità di aggiornamento. Per altri dettagli, vedere MERGE INTO.
Importante
-
foreachBatch()
fornisce solo garanzie di scrittura at-least-once. Tuttavia, è possibile usare ilbatchId
fornito alla funzione come modo per deduplicare l'output e ottenere una garanzia di esecuzione unica. In entrambi i casi, è necessario ragionare sulla semantica end-to-end manualmente. -
foreachBatch()
non funziona con la modalità di elaborazione continua perché si basa fondamentalmente sull'esecuzione micro batch di una query di streaming. Se si scrivono dati in modalità continua, usareforeach()
invece .
È possibile richiamare un dataframe vuoto con foreachBatch()
e il codice utente deve essere resiliente per consentire un funzionamento corretto. Di seguito è riportato un esempio:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Modifiche del comportamento per foreachBatch
in Databricks Runtime 14.0
In Databricks Runtime 14.0 e versioni successive nel calcolo configurato con la modalità di accesso condiviso si applicano le modifiche di comportamento seguenti:
- I comandi
print()
scrivono l'output nei log del driver. - Non è possibile accedere al modulo secondario
dbutils.widgets
all'interno della funzione. - Tutti i file, i moduli o gli oggetti a cui si fa riferimento nella funzione devono essere serializzabili e disponibili in Spark.
Riutilizzare le origini dati batch esistenti
Usando foreachBatch()
, è possibile usare writer di dati batch esistenti per sink di dati che potrebbero non avere il supporto di Structured Streaming. Ecco alcuni esempi:
Molte altre origini dati batch possono essere usate da foreachBatch()
. Vedere Connettersi alle origini dati.
Scrivere in più posizioni
Se è necessario scrivere l'output di una query di streaming in più posizioni, Databricks consiglia di usare più writer di streaming strutturato per ottimizzare la parallelizzazione e la velocità effettiva.
L'uso foreachBatch
di per scrivere in più sink serializza l'esecuzione di scritture di streaming, che può aumentare la latenza per ogni micro batch.
Se si usa foreachBatch
per scrivere in più tabelle Delta, vedere scritture di tabelle Idempotenti in foreachBatch.