Letture e scritture di streaming di tabelle delta
Delta Lake è completamente integrato con Spark Structured Streaming tramite readStream
e writeStream
. Delta Lake supera molte delle limitazioni in genere associate ai sistemi e ai file di streaming, tra cui:
- Unione di file di piccole dimensioni prodotti dall'inserimento a bassa latenza.
- Gestione dell'elaborazione "esattamente una volta" con più flussi (o processi batch simultanei).
- Individuazione efficiente dei file nuovi quando si usano file come origine per un flusso.
Nota
Questo articolo descrive l'uso di tabelle Delta Lake come origini di streaming e sink. Per informazioni su come caricare dati usando tabelle di streaming in Databricks SQL, vedere Caricare dati usando tabelle di streaming in Databricks SQL.
Per informazioni sui join statici di flusso con Delta Lake, vedere Join statici di Stream.
Tabella Delta come origine
Structured Streaming legge in modo incrementale le tabelle Delta. Mentre una query di streaming è attiva su una tabella Delta, i nuovi record vengono elaborati in modo idempotente quando le nuove versioni della tabella eseguono il commit nella tabella di origine.
Gli esempi di codice seguenti illustrano la configurazione di una lettura in streaming usando il nome della tabella o il percorso del file.
Python
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Scala
spark.readStream.table("table_name")
spark.readStream.load("/path/to/table")
Importante
Se lo schema per una tabella Delta viene modificato dopo l'inizio di una lettura di streaming sulla tabella, la query avrà esito negativo. Per la maggior parte delle modifiche dello schema, è possibile riavviare lo streaming per risolvere la mancata corrispondenza dello schema e continuare l'elaborazione.
In Databricks Runtime 12.2 LTS e versioni successive non è possibile eseguire lo streaming da una tabella Delta con mapping di colonne abilitato che ha subito un'evoluzione dello schema non additivi, ad esempio la ridenominazione o l'eliminazione di colonne. Per informazioni dettagliate, vedere Streaming con mapping di colonne e modifiche dello schema.
Limitare la velocità di input
Per controllare i micro batch sono disponibili le opzioni seguenti:
maxFilesPerTrigger
: numero di nuovi file da considerare in ogni micro batch. Il valore predefinito è 1000.maxBytesPerTrigger
: quantità di dati elaborati in ogni micro batch. Questa opzione imposta un valore "soft max", ovvero un batch elabora approssimativamente questa quantità di dati e può elaborare più del limite per far avanzare la query di streaming nei casi in cui l'unità di input più piccola è maggiore di questo limite. Questa impostazione non è impostata per impostazione predefinita.
Se si usa maxBytesPerTrigger
insieme a maxFilesPerTrigger
, il micro batch elabora i dati fino a quando non viene raggiunto il maxFilesPerTrigger
limite o maxBytesPerTrigger
.
Nota
Nei casi in cui le transazioni della tabella di origine vengono pulite a causa della logRetentionDuration
configurazione e la query di streaming tenta di elaborare tali versioni, per impostazione predefinita la query non riesce a evitare la perdita di dati. È possibile impostare l'opzione su false
per ignorare i dati persi e continuare l'elaborazionefailOnDataLoss
.
Trasmettere un feed Delta Lake Change Data Capture (CDC)
Il feed di dati delle modifiche delta Lake registra le modifiche apportate a una tabella Delta, inclusi gli aggiornamenti e le eliminazioni. Se abilitata, è possibile eseguire lo streaming da un feed di dati delle modifiche e scrivere la logica per elaborare inserimenti, aggiornamenti ed eliminazioni in tabelle downstream. Sebbene l'output dei dati del feed di dati delle modifiche sia leggermente diverso dalla tabella Delta descritto, offre una soluzione per la propagazione di modifiche incrementali alle tabelle downstream in un'architettura medallion.
Importante
In Databricks Runtime 12.2 LTS e versioni successive non è possibile eseguire lo streaming dal feed di dati delle modifiche per una tabella Delta con mapping di colonne abilitato che ha subito un'evoluzione dello schema non additivi, ad esempio la ridenominazione o l'eliminazione di colonne. Vedere Streaming con mapping di colonne e modifiche dello schema.
Ignorare gli aggiornamenti e le eliminazioni
Structured Streaming non gestisce l'input che non è un accodamento e genera un'eccezione se vengono apportate modifiche alla tabella utilizzata come origine. Esistono due strategie principali per gestire le modifiche che non possono essere propagate automaticamente a valle:
- È possibile eliminare l'output e il checkpoint e riavviare il flusso dall'inizio.
- È possibile impostare una di queste due opzioni:
ignoreDeletes
: ignora le transazioni che eliminano i dati in corrispondenza dei limiti della partizione.skipChangeCommits
: ignora le transazioni che eliminano o modificano i record esistenti.skipChangeCommits
includeignoreDeletes
.
Nota
In Databricks Runtime 12.2 LTS e versioni successive depreca skipChangeCommits
l'impostazione ignoreChanges
precedente. In Databricks Runtime 11.3 LTS e versioni precedenti ignoreChanges
è l'unica opzione supportata.
La semantica per ignoreChanges
differisce notevolmente da skipChangeCommits
. Con l'abilitazione di ignoreChanges
, i file di dati riscritti nella tabella di origine vengono rigenerati dopo un'operazione di modifica dei dati, ad esempio UPDATE
, MERGE INTO
, DELETE
(all’interno di partizioni) o OVERWRITE
. Le righe invariate vengono spesso generate insieme a quelle nuove, quindi i consumer downstream devono essere in grado di gestire i duplicati. Le eliminazioni non vengono propagate downstream. ignoreChanges
include ignoreDeletes
.
skipChangeCommits
ignora completamente le operazioni di modifica dei file. I file di dati riscritti nella tabella di origine a causa dell'operazione di modifica dei dati, ad esempio UPDATE
, MERGE INTO
, DELETE
e OVERWRITE
vengono ignorati completamente. Per riflettere le modifiche nelle tabelle di origine upstream, è necessario implementare una logica separata per propagare tali modifiche.
I carichi di lavoro configurati con ignoreChanges
continuano a funzionare usando la semantica nota, ma Databricks consiglia di usare skipChangeCommits
per tutti i nuovi carichi di lavoro. La migrazione dei carichi di lavoro con ignoreChanges
per richiede skipChangeCommits
la logica di refactoring.
Esempio
Si supponga, ad esempio, di avere una tabella user_events
con date
colonne , user_email
e action
partizionate da date
. Si estrae dalla tabella ed è necessario eliminarli a causa del GDPR.You stream out of the user_events
table and you need to delete data from it due to GDPR.
Quando si elimina in corrispondenza dei limiti della partizione, ovvero si WHERE
trova in una colonna di partizione, i file sono già segmentati per valore in modo che l'eliminazione elimini solo i file dai metadati. Quando si elimina un'intera partizione di dati, è possibile usare quanto segue:
spark.readStream
.option("ignoreDeletes", "true")
.table("user_events")
Se si eliminano dati in più partizioni (in questo esempio, il filtro in base user_email
a ), usare la sintassi seguente:
spark.readStream
.option("skipChangeCommits", "true")
.table("user_events")
Se si aggiorna un user_email
oggetto con l'istruzione UPDATE
, il file contenente l'oggetto user_email
in questione viene riscritto. Usare skipChangeCommits
per ignorare i file di dati modificati.
Specificare la posizione iniziale
È possibile usare le opzioni seguenti per specificare il punto iniziale dell'origine di streaming Delta Lake senza elaborare l'intera tabella.
startingVersion
: versione delta Lake da cui iniziare. Databricks consiglia di omettere questa opzione per la maggior parte dei carichi di lavoro. Quando non è impostato, il flusso inizia dalla versione disponibile più recente, incluso uno snapshot completo della tabella in quel momento.Se specificato, il flusso legge tutte le modifiche apportate alla tabella Delta a partire dalla versione specificata (inclusa). Se la versione specificata non è più disponibile, l'avvio del flusso non riesce. È possibile ottenere le versioni di commit dalla
version
colonna dell'output del comando DESCRIBE HISTORY .Per restituire solo le modifiche più recenti, specificare
latest
.startingTimestamp
: timestamp da cui iniziare. Tutte le modifiche apportate alla tabella di cui è stato eseguito il commit o dopo il timestamp (inclusivo) vengono lette dal lettore di streaming. Se il timestamp specificato precede tutti i commit della tabella, la lettura di streaming inizia con il timestamp meno recente disponibile. Uno dei valori possibili:- Stringa di timestamp. Ad esempio:
"2019-01-01T00:00:00.000Z"
. - Stringa di data. Ad esempio:
"2019-01-01"
.
- Stringa di timestamp. Ad esempio:
Non è possibile impostare entrambe le opzioni contemporaneamente. Diventano effettive solo quando si avvia una nuova query di streaming. Se una query di streaming è stata avviata e lo stato di avanzamento è stato registrato nel relativo checkpoint, queste opzioni vengono ignorate.
Importante
Anche se è possibile avviare l'origine di streaming da una versione o un timestamp specificato, lo schema dell'origine di streaming è sempre lo schema più recente della tabella Delta. È necessario assicurarsi che non vi sia alcuna modifica dello schema incompatibile alla tabella Delta dopo la versione o il timestamp specificati. In caso contrario, l'origine di streaming potrebbe restituire risultati non corretti durante la lettura dei dati con uno schema non corretto.
Esempio
Si supponga, ad esempio, di avere una tabella user_events
. Se si desidera leggere le modifiche dalla versione 5, usare:
spark.readStream
.option("startingVersion", "5")
.table("user_events")
Se si desidera leggere le modifiche dal 2018-10-18, usare:
spark.readStream
.option("startingTimestamp", "2018-10-18")
.table("user_events")
Elaborare lo snapshot iniziale senza eliminare i dati
Nota
Questa funzionalità è disponibile in Databricks Runtime 11.3 LTS e versioni successive. Questa funzionalità è disponibile in anteprima pubblica.
Quando si usa una tabella Delta come origine di flusso, la query elabora innanzitutto tutti i dati presenti nella tabella. La tabella Delta in questa versione è denominata snapshot iniziale. Per impostazione predefinita, i file di dati della tabella Delta vengono elaborati in base all'ultima modifica del file. Tuttavia, l'ora dell'ultima modifica non rappresenta necessariamente l'ordine di ora dell'evento del record.
In una query di streaming con stato con una filigrana definita, l'elaborazione dei file in base al tempo di modifica può comportare l'elaborazione dei record nell'ordine errato. Ciò potrebbe causare l'eliminazione dei record come eventi tardivi in base alla filigrana.
È possibile evitare il problema di rilascio dei dati abilitando l'opzione seguente:
- withEventTimeOrder: indica se lo snapshot iniziale deve essere elaborato con l'ordine dell'ora dell'evento.
Con l'ordine di tempo dell'evento abilitato, l'intervallo di tempo dell'evento dei dati dello snapshot iniziale viene suddiviso in bucket di tempo. Ogni micro batch elabora un bucket filtrando i dati all'interno dell'intervallo di tempo. Le opzioni di configurazione maxFilesPerTrigger e maxBytesPerTrigger sono ancora applicabili per controllare le dimensioni del microbatch, ma solo in modo approssimativo a causa della natura dell'elaborazione.
L'immagine seguente mostra questo processo:
Informazioni rilevanti su questa funzionalità:
- Il problema di eliminazione dei dati si verifica solo quando lo snapshot Delta iniziale di una query di streaming con stato viene elaborato nell'ordine predefinito.
- Non è possibile modificare
withEventTimeOrder
dopo l'avvio della query di flusso durante l'elaborazione dello snapshot iniziale. Per riavviare conwithEventTimeOrder
la modifica, è necessario eliminare il checkpoint. - Se si esegue una query di flusso con conEventTimeOrder abilitato, non è possibile eseguirne il downgrade a una versione DBR che non supporta questa funzionalità fino al completamento dell'elaborazione iniziale dello snapshot. Se è necessario effettuare il downgrade, è possibile attendere il completamento dello snapshot iniziale oppure eliminare il checkpoint e riavviare la query.
- Questa funzionalità non è supportata negli scenari non comuni seguenti:
- La colonna ora evento è una colonna generata e sono presenti trasformazioni non di proiezione tra l'origine Delta e la filigrana.
- Esiste una filigrana con più di un'origine Delta nella query di flusso.
- Con l'ordine di tempo dell'evento abilitato, le prestazioni dell'elaborazione iniziale dello snapshot delta potrebbero risultare più lente.
- Ogni micro batch analizza lo snapshot iniziale per filtrare i dati all'interno dell'intervallo di tempo dell'evento corrispondente. Per un'azione di filtro più rapida, è consigliabile usare una colonna di origine Delta come ora dell'evento, in modo che sia possibile applicare il salto dei dati (controllare l'opzione Ignora dati per Delta Lake quando è applicabile). Inoltre, il partizionamento delle tabelle lungo la colonna dell'ora dell'evento può velocizzare ulteriormente l'elaborazione. È possibile controllare l'interfaccia utente di Spark per verificare il numero di file differenziali analizzati per un micro batch specifico.
Esempio
Si supponga di avere una tabella user_events
con una event_time
colonna. La query di streaming è una query di aggregazione. Se si vuole assicurarsi che non vengano visualizzati dati durante l'elaborazione iniziale dello snapshot, è possibile usare:
spark.readStream
.option("withEventTimeOrder", "true")
.table("user_events")
.withWatermark("event_time", "10 seconds")
Nota
È anche possibile abilitare questa opzione con la configurazione di Spark nel cluster che verrà applicata a tutte le query di streaming: spark.databricks.delta.withEventTimeOrder.enabled true
Tabella Delta come sink
È anche possibile scrivere dati in una tabella Delta usando Structured Streaming. Il log delle transazioni consente a Delta Lake di garantire l'elaborazione esattamente una sola volta, anche quando sono presenti altri flussi o query batch in esecuzione simultaneamente sulla tabella.
Nota
La funzione Delta Lake rimuove tutti i file non gestiti da Delta Lake VACUUM
, ma ignora tutte le directory che iniziano con _
. È possibile archiviare in modo sicuro i checkpoint insieme ad altri dati e metadati per una tabella Delta usando una struttura di directory, <table-name>/_checkpoints
ad esempio .
Metrica
È possibile scoprire il numero di byte e il numero di file ancora da elaborare in un processo di query di streaming come numBytesOutstanding
metriche e numFilesOutstanding
. Le metriche aggiuntive includono:
numNewListedFiles
: numero di file Delta Lake elencati per calcolare il backlog per questo batch.backlogEndOffset
: versione della tabella usata per calcolare il backlog.
Se si esegue il flusso in un notebook, è possibile visualizzare queste metriche nella scheda Dati non elaborati nel dashboard di stato della query di streaming:
{
"sources" : [
{
"description" : "DeltaSource[file:/path/to/source]",
"metrics" : {
"numBytesOutstanding" : "3456",
"numFilesOutstanding" : "8"
},
}
]
}
Modalità accodamento
Per impostazione predefinita, i flussi vengono eseguiti in modalità Append, aggiungendo nuovi record alla tabella.
Usare il metodo durante lo toTable
streaming in tabelle, come nell'esempio seguente:
Python
(events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
)
Scala
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Modalità completa
È anche possibile usare Structured Streaming per sostituire l'intera tabella con ogni batch. Un caso d'uso di esempio consiste nel calcolare un riepilogo usando l'aggregazione:
Python
(spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
)
Scala
spark.readStream
.table("events")
.groupBy("customerId")
.count()
.writeStream
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.toTable("events_by_customer")
L'esempio precedente aggiorna continuamente una tabella contenente il numero aggregato di eventi per cliente.
Per le applicazioni con requisiti di latenza più elevati, è possibile risparmiare risorse di calcolo con trigger monouso. Usare queste opzioni per aggiornare le tabelle di aggregazione di riepilogo in base a una pianificazione specifica, elaborando solo i nuovi dati arrivati dopo l'ultimo aggiornamento.
Upsert dalle query di streaming con foreachBatch
È possibile usare una combinazione di e foreachBatch
per scrivere upsert complessi da una query di merge
streaming in una tabella Delta. Vedere Usare foreachBatch per scrivere sink di dati arbitrari.
Questo modello include molte applicazioni, tra cui le seguenti:
- Scrivere aggregazioni di streaming in modalità di aggiornamento: è molto più efficiente rispetto alla modalità completa.
- Scrivere un flusso di modifiche del database in una tabella Delta: la query di merge per la scrittura di dati delle modifiche può essere usata in
foreachBatch
per applicare continuamente un flusso di modifiche a una tabella Delta. - Scrivere un flusso di dati in una tabella Delta con deduplicazione: la query di unione di sola inserimento per la deduplicazione può essere usata in
foreachBatch
per scrivere continuamente dati (con duplicati) in una tabella Delta con deduplicazione automatica.
Nota
- Assicurarsi che l'istruzione
merge
all'internoforeachBatch
di sia idempotente perché i riavvii della query di streaming possono applicare l'operazione nello stesso batch di dati più volte. - Quando
merge
viene usato inforeachBatch
, la frequenza dei dati di input della query di streaming (segnalata tramiteStreamingQueryProgress
e visibile nel grafico della frequenza del notebook) può essere segnalata come un multiplo della frequenza effettiva con cui i dati vengono generati nell'origine. Ciò avviene perchémerge
legge più volte i dati di input che causano la moltiplicazione delle metriche di input. Se si tratta di un collo di bottiglia, è possibile memorizzare nella cache il batch DataFrame prima delmerge
e quindi annullarlo dopo ilmerge
.
L'esempio seguente illustra come usare SQL all'interno foreachBatch
di per eseguire questa attività:
Scala
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
// Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
# Set the dataframe to view name
microBatchOutputDF.createOrReplaceTempView("updates")
# Use the view name to apply MERGE
# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
# In Databricks Runtime 10.5 and below, you must use the following:
# microBatchOutputDF._jdf.sparkSession().sql("""
microBatchOutputDF.sparkSession.sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
È anche possibile scegliere di usare le API Delta Lake per eseguire upsert di streaming, come nell'esempio seguente:
Scala
import io.delta.tables.*
val deltaTable = DeltaTable.forName(spark, "table_name")
// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "table_name")
# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
(deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)
Scritture di tabelle Idempotenti in foreachBatch
Nota
Databricks consiglia di configurare una scrittura di streaming separata per ogni sink da aggiornare. L'uso foreachBatch
di per scrivere in più tabelle serializza le scritture, riducendo parallelizaiton e aumentando la latenza complessiva.
Le tabelle Delta supportano le opzioni seguenti DataFrameWriter
per eseguire operazioni di scrittura in più tabelle all'interno di foreachBatch
idempotente:
txnAppId
: stringa univoca che è possibile passare a ogni scrittura di dataframe. Ad esempio, è possibile usare l'ID StreamingQuery cometxnAppId
.txnVersion
: numero che aumenta in modo monotonico che funge da versione della transazione.
Delta Lake usa la combinazione di txnAppId
e txnVersion
per identificare le scritture duplicate e ignorarle.
Se una scrittura batch viene interrotta con un errore, la ripetizione dell'esecuzione del batch usa la stessa applicazione e lo stesso ID batch per consentire al runtime di identificare correttamente le scritture duplicate e ignorarle. L'ID applicazione (txnAppId
) può essere qualsiasi stringa univoca generata dall'utente e non deve essere correlata all'ID flusso. Vedere Usare foreachBatch per scrivere sink di dati arbitrari.
Avviso
Se si elimina il checkpoint di streaming e si riavvia la query con un nuovo checkpoint, è necessario specificare un elemento diverso txnAppId
. I nuovi checkpoint iniziano con un ID batch di 0
. Delta Lake usa l'ID batch e txnAppId
come chiave univoca e ignora i batch con valori già visualizzati.
L'esempio di codice seguente illustra questo modello:
Python
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}