Condividi tramite


Letture e scritture di streaming di tabelle delta

Delta Lake è completamente integrato con Spark Structured Streaming tramite readStream e writeStream. Delta Lake supera molte delle limitazioni in genere associate ai sistemi e ai file di streaming, tra cui:

  • Unione di file di piccole dimensioni prodotti dall'inserimento a bassa latenza.
  • Gestione dell'elaborazione "esattamente una volta" con più flussi (o processi batch simultanei).
  • Individuazione efficiente dei file nuovi quando si usano file come origine per un flusso.

Nota

Questo articolo descrive l'uso di tabelle Delta Lake come origini di streaming e sink. Per informazioni su come caricare dati usando tabelle di streaming in Databricks SQL, vedere Caricare dati usando tabelle di streaming in Databricks SQL.

Per informazioni sui join statici di flusso con Delta Lake, vedere Join statici di Stream.

Tabella Delta come origine

Structured Streaming legge in modo incrementale le tabelle Delta. Mentre una query di streaming è attiva su una tabella Delta, i nuovi record vengono elaborati in modo idempotente quando le nuove versioni della tabella eseguono il commit nella tabella di origine.

Gli esempi di codice seguenti illustrano la configurazione di una lettura in streaming usando il nome della tabella o il percorso del file.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Importante

Se lo schema per una tabella Delta viene modificato dopo l'inizio di una lettura di streaming sulla tabella, la query avrà esito negativo. Per la maggior parte delle modifiche dello schema, è possibile riavviare lo streaming per risolvere la mancata corrispondenza dello schema e continuare l'elaborazione.

In Databricks Runtime 12.2 LTS e versioni successive non è possibile eseguire lo streaming da una tabella Delta con mapping di colonne abilitato che ha subito un'evoluzione dello schema non additivi, ad esempio la ridenominazione o l'eliminazione di colonne. Per informazioni dettagliate, vedere Streaming con mapping di colonne e modifiche dello schema.

Limitare la velocità di input

Per controllare i micro batch sono disponibili le opzioni seguenti:

  • maxFilesPerTrigger: numero di nuovi file da considerare in ogni micro batch. Il valore predefinito è 1000.
  • maxBytesPerTrigger: quantità di dati elaborati in ogni micro batch. Questa opzione imposta un valore "soft max", ovvero un batch elabora approssimativamente questa quantità di dati e può elaborare più del limite per far avanzare la query di streaming nei casi in cui l'unità di input più piccola è maggiore di questo limite. Questa impostazione non è impostata per impostazione predefinita.

Se si usa maxBytesPerTrigger insieme a maxFilesPerTrigger, il micro batch elabora i dati fino a quando non viene raggiunto il maxFilesPerTrigger limite o maxBytesPerTrigger .

Nota

Nei casi in cui le transazioni della tabella di origine vengono pulite a causa della logRetentionDuration configurazione e la query di streaming tenta di elaborare tali versioni, per impostazione predefinita la query non riesce a evitare la perdita di dati. È possibile impostare l'opzione su false per ignorare i dati persi e continuare l'elaborazionefailOnDataLoss.

Trasmettere un feed Delta Lake Change Data Capture (CDC)

Il feed di dati delle modifiche delta Lake registra le modifiche apportate a una tabella Delta, inclusi gli aggiornamenti e le eliminazioni. Se abilitata, è possibile eseguire lo streaming da un feed di dati delle modifiche e scrivere la logica per elaborare inserimenti, aggiornamenti ed eliminazioni in tabelle downstream. Sebbene l'output dei dati del feed di dati delle modifiche sia leggermente diverso dalla tabella Delta descritto, offre una soluzione per la propagazione di modifiche incrementali alle tabelle downstream in un'architettura medallion.

Importante

In Databricks Runtime 12.2 LTS e versioni successive non è possibile eseguire lo streaming dal feed di dati delle modifiche per una tabella Delta con mapping di colonne abilitato che ha subito un'evoluzione dello schema non additivi, ad esempio la ridenominazione o l'eliminazione di colonne. Vedere Streaming con mapping di colonne e modifiche dello schema.

Ignorare gli aggiornamenti e le eliminazioni

Structured Streaming non gestisce l'input che non è un accodamento e genera un'eccezione se vengono apportate modifiche alla tabella utilizzata come origine. Esistono due strategie principali per gestire le modifiche che non possono essere propagate automaticamente a valle:

  • È possibile eliminare l'output e il checkpoint e riavviare il flusso dall'inizio.
  • È possibile impostare una di queste due opzioni:
    • ignoreDeletes: ignora le transazioni che eliminano i dati in corrispondenza dei limiti della partizione.
    • skipChangeCommits: ignora le transazioni che eliminano o modificano i record esistenti. skipChangeCommits include ignoreDeletes.

Nota

In Databricks Runtime 12.2 LTS e versioni successive depreca skipChangeCommits l'impostazione ignoreChangesprecedente. In Databricks Runtime 11.3 LTS e versioni precedenti ignoreChanges è l'unica opzione supportata.

La semantica per ignoreChanges differisce notevolmente da skipChangeCommits. Con l'abilitazione di ignoreChanges, i file di dati riscritti nella tabella di origine vengono rigenerati dopo un'operazione di modifica dei dati, ad esempio UPDATE, MERGE INTO, DELETE (all’interno di partizioni) o OVERWRITE. Le righe invariate vengono spesso generate insieme a quelle nuove, quindi i consumer downstream devono essere in grado di gestire i duplicati. Le eliminazioni non vengono propagate downstream. ignoreChanges include ignoreDeletes.

skipChangeCommits ignora completamente le operazioni di modifica dei file. I file di dati riscritti nella tabella di origine a causa dell'operazione di modifica dei dati, ad esempio UPDATE, MERGE INTO, DELETE e OVERWRITE vengono ignorati completamente. Per riflettere le modifiche nelle tabelle di origine upstream, è necessario implementare una logica separata per propagare tali modifiche.

I carichi di lavoro configurati con ignoreChanges continuano a funzionare usando la semantica nota, ma Databricks consiglia di usare skipChangeCommits per tutti i nuovi carichi di lavoro. La migrazione dei carichi di lavoro con ignoreChanges per richiede skipChangeCommits la logica di refactoring.

Esempio

Si supponga, ad esempio, di avere una tabella user_events con datecolonne , user_emaile action partizionate da date. Si estrae dalla tabella ed è necessario eliminarli a causa del GDPR.You stream out of the user_events table and you need to delete data from it due to GDPR.

Quando si elimina in corrispondenza dei limiti della partizione, ovvero si WHERE trova in una colonna di partizione, i file sono già segmentati per valore in modo che l'eliminazione elimini solo i file dai metadati. Quando si elimina un'intera partizione di dati, è possibile usare quanto segue:

spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

Se si eliminano dati in più partizioni (in questo esempio, il filtro in base user_emaila ), usare la sintassi seguente:

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

Se si aggiorna un user_email oggetto con l'istruzione UPDATE , il file contenente l'oggetto user_email in questione viene riscritto. Usare skipChangeCommits per ignorare i file di dati modificati.

Specificare la posizione iniziale

È possibile usare le opzioni seguenti per specificare il punto iniziale dell'origine di streaming Delta Lake senza elaborare l'intera tabella.

  • startingVersion: versione delta Lake da cui iniziare. Databricks consiglia di omettere questa opzione per la maggior parte dei carichi di lavoro. Quando non è impostato, il flusso inizia dalla versione disponibile più recente, incluso uno snapshot completo della tabella in quel momento.

    Se specificato, il flusso legge tutte le modifiche apportate alla tabella Delta a partire dalla versione specificata (inclusa). Se la versione specificata non è più disponibile, l'avvio del flusso non riesce. È possibile ottenere le versioni di commit dalla version colonna dell'output del comando DESCRIBE HISTORY .

    Per restituire solo le modifiche più recenti, specificare latest.

  • startingTimestamp: timestamp da cui iniziare. Tutte le modifiche apportate alla tabella di cui è stato eseguito il commit o dopo il timestamp (inclusivo) vengono lette dal lettore di streaming. Se il timestamp specificato precede tutti i commit della tabella, la lettura di streaming inizia con il timestamp meno recente disponibile. Uno dei valori possibili:

    • Stringa di timestamp. Ad esempio: "2019-01-01T00:00:00.000Z".
    • Stringa di data. Ad esempio: "2019-01-01".

Non è possibile impostare entrambe le opzioni contemporaneamente. Diventano effettive solo quando si avvia una nuova query di streaming. Se una query di streaming è stata avviata e lo stato di avanzamento è stato registrato nel relativo checkpoint, queste opzioni vengono ignorate.

Importante

Anche se è possibile avviare l'origine di streaming da una versione o un timestamp specificato, lo schema dell'origine di streaming è sempre lo schema più recente della tabella Delta. È necessario assicurarsi che non vi sia alcuna modifica dello schema incompatibile alla tabella Delta dopo la versione o il timestamp specificati. In caso contrario, l'origine di streaming potrebbe restituire risultati non corretti durante la lettura dei dati con uno schema non corretto.

Esempio

Si supponga, ad esempio, di avere una tabella user_events. Se si desidera leggere le modifiche dalla versione 5, usare:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

Se si desidera leggere le modifiche dal 2018-10-18, usare:

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

Elaborare lo snapshot iniziale senza eliminare i dati

Nota

Questa funzionalità è disponibile in Databricks Runtime 11.3 LTS e versioni successive. Questa funzionalità è disponibile in anteprima pubblica.

Quando si usa una tabella Delta come origine di flusso, la query elabora innanzitutto tutti i dati presenti nella tabella. La tabella Delta in questa versione è denominata snapshot iniziale. Per impostazione predefinita, i file di dati della tabella Delta vengono elaborati in base all'ultima modifica del file. Tuttavia, l'ora dell'ultima modifica non rappresenta necessariamente l'ordine di ora dell'evento del record.

In una query di streaming con stato con una filigrana definita, l'elaborazione dei file in base al tempo di modifica può comportare l'elaborazione dei record nell'ordine errato. Ciò potrebbe causare l'eliminazione dei record come eventi tardivi in base alla filigrana.

È possibile evitare il problema di rilascio dei dati abilitando l'opzione seguente:

  • withEventTimeOrder: indica se lo snapshot iniziale deve essere elaborato con l'ordine dell'ora dell'evento.

Con l'ordine di tempo dell'evento abilitato, l'intervallo di tempo dell'evento dei dati dello snapshot iniziale viene suddiviso in bucket di tempo. Ogni micro batch elabora un bucket filtrando i dati all'interno dell'intervallo di tempo. Le opzioni di configurazione maxFilesPerTrigger e maxBytesPerTrigger sono ancora applicabili per controllare le dimensioni del microbatch, ma solo in modo approssimativo a causa della natura dell'elaborazione.

L'immagine seguente mostra questo processo:

Snapshot iniziale

Informazioni rilevanti su questa funzionalità:

  • Il problema di eliminazione dei dati si verifica solo quando lo snapshot Delta iniziale di una query di streaming con stato viene elaborato nell'ordine predefinito.
  • Non è possibile modificare withEventTimeOrder dopo l'avvio della query di flusso durante l'elaborazione dello snapshot iniziale. Per riavviare con withEventTimeOrder la modifica, è necessario eliminare il checkpoint.
  • Se si esegue una query di flusso con conEventTimeOrder abilitato, non è possibile eseguirne il downgrade a una versione DBR che non supporta questa funzionalità fino al completamento dell'elaborazione iniziale dello snapshot. Se è necessario effettuare il downgrade, è possibile attendere il completamento dello snapshot iniziale oppure eliminare il checkpoint e riavviare la query.
  • Questa funzionalità non è supportata negli scenari non comuni seguenti:
    • La colonna ora evento è una colonna generata e sono presenti trasformazioni non di proiezione tra l'origine Delta e la filigrana.
    • Esiste una filigrana con più di un'origine Delta nella query di flusso.
  • Con l'ordine di tempo dell'evento abilitato, le prestazioni dell'elaborazione iniziale dello snapshot delta potrebbero risultare più lente.
  • Ogni micro batch analizza lo snapshot iniziale per filtrare i dati all'interno dell'intervallo di tempo dell'evento corrispondente. Per un'azione di filtro più rapida, è consigliabile usare una colonna di origine Delta come ora dell'evento, in modo che sia possibile applicare il salto dei dati (controllare l'opzione Ignora dati per Delta Lake quando è applicabile). Inoltre, il partizionamento delle tabelle lungo la colonna dell'ora dell'evento può velocizzare ulteriormente l'elaborazione. È possibile controllare l'interfaccia utente di Spark per verificare il numero di file differenziali analizzati per un micro batch specifico.

Esempio

Si supponga di avere una tabella user_events con una event_time colonna. La query di streaming è una query di aggregazione. Se si vuole assicurarsi che non vengano visualizzati dati durante l'elaborazione iniziale dello snapshot, è possibile usare:

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

Nota

È anche possibile abilitare questa opzione con la configurazione di Spark nel cluster che verrà applicata a tutte le query di streaming: spark.databricks.delta.withEventTimeOrder.enabled true

Tabella Delta come sink

È anche possibile scrivere dati in una tabella Delta usando Structured Streaming. Il log delle transazioni consente a Delta Lake di garantire l'elaborazione esattamente una sola volta, anche quando sono presenti altri flussi o query batch in esecuzione simultaneamente sulla tabella.

Nota

La funzione Delta Lake rimuove tutti i file non gestiti da Delta Lake VACUUM , ma ignora tutte le directory che iniziano con _. È possibile archiviare in modo sicuro i checkpoint insieme ad altri dati e metadati per una tabella Delta usando una struttura di directory, <table-name>/_checkpointsad esempio .

Metrica

È possibile scoprire il numero di byte e il numero di file ancora da elaborare in un processo di query di streaming come numBytesOutstanding metriche e numFilesOutstanding . Le metriche aggiuntive includono:

  • numNewListedFiles: numero di file Delta Lake elencati per calcolare il backlog per questo batch.
    • backlogEndOffset: versione della tabella usata per calcolare il backlog.

Se si esegue il flusso in un notebook, è possibile visualizzare queste metriche nella scheda Dati non elaborati nel dashboard di stato della query di streaming:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Modalità accodamento

Per impostazione predefinita, i flussi vengono eseguiti in modalità Append, aggiungendo nuovi record alla tabella.

Usare il metodo durante lo toTable streaming in tabelle, come nell'esempio seguente:

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Modalità completa

È anche possibile usare Structured Streaming per sostituire l'intera tabella con ogni batch. Un caso d'uso di esempio consiste nel calcolare un riepilogo usando l'aggregazione:

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

L'esempio precedente aggiorna continuamente una tabella contenente il numero aggregato di eventi per cliente.

Per le applicazioni con requisiti di latenza più elevati, è possibile risparmiare risorse di calcolo con trigger monouso. Usare queste opzioni per aggiornare le tabelle di aggregazione di riepilogo in base a una pianificazione specifica, elaborando solo i nuovi dati arrivati dopo l'ultimo aggiornamento.

Upsert dalle query di streaming con foreachBatch

È possibile usare una combinazione di e foreachBatch per scrivere upsert complessi da una query di merge streaming in una tabella Delta. Vedere Usare foreachBatch per scrivere sink di dati arbitrari.

Questo modello include molte applicazioni, tra cui le seguenti:

  • Scrivere aggregazioni di streaming in modalità di aggiornamento: è molto più efficiente rispetto alla modalità completa.
  • Scrivere un flusso di modifiche del database in una tabella Delta: la query di merge per la scrittura di dati delle modifiche può essere usata in foreachBatch per applicare continuamente un flusso di modifiche a una tabella Delta.
  • Scrivere un flusso di dati in una tabella Delta con deduplicazione: la query di unione di sola inserimento per la deduplicazione può essere usata in foreachBatch per scrivere continuamente dati (con duplicati) in una tabella Delta con deduplicazione automatica.

Nota

  • Assicurarsi che l'istruzione merge all'interno foreachBatch di sia idempotente perché i riavvii della query di streaming possono applicare l'operazione nello stesso batch di dati più volte.
  • Quando merge viene usato in foreachBatch, la frequenza dei dati di input della query di streaming (segnalata tramite StreamingQueryProgress e visibile nel grafico della frequenza del notebook) può essere segnalata come un multiplo della frequenza effettiva con cui i dati vengono generati nell'origine. Ciò avviene perché merge legge più volte i dati di input che causano la moltiplicazione delle metriche di input. Se si tratta di un collo di bottiglia, è possibile memorizzare nella cache il batch DataFrame prima del merge e quindi annullarlo dopo il merge.

L'esempio seguente illustra come usare SQL all'interno foreachBatch di per eseguire questa attività:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

È anche possibile scegliere di usare le API Delta Lake per eseguire upsert di streaming, come nell'esempio seguente:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Scritture di tabelle Idempotenti in foreachBatch

Nota

Databricks consiglia di configurare una scrittura di streaming separata per ogni sink da aggiornare. L'uso foreachBatch di per scrivere in più tabelle serializza le scritture, riducendo parallelizaiton e aumentando la latenza complessiva.

Le tabelle Delta supportano le opzioni seguenti DataFrameWriter per eseguire operazioni di scrittura in più tabelle all'interno di foreachBatch idempotente:

  • txnAppId: stringa univoca che è possibile passare a ogni scrittura di dataframe. Ad esempio, è possibile usare l'ID StreamingQuery come txnAppId.
  • txnVersion: numero che aumenta in modo monotonico che funge da versione della transazione.

Delta Lake usa la combinazione di txnAppId e txnVersion per identificare le scritture duplicate e ignorarle.

Se una scrittura batch viene interrotta con un errore, la ripetizione dell'esecuzione del batch usa la stessa applicazione e lo stesso ID batch per consentire al runtime di identificare correttamente le scritture duplicate e ignorarle. L'ID applicazione (txnAppId) può essere qualsiasi stringa univoca generata dall'utente e non deve essere correlata all'ID flusso. Vedere Usare foreachBatch per scrivere sink di dati arbitrari.

Avviso

Se si elimina il checkpoint di streaming e si riavvia la query con un nuovo checkpoint, è necessario specificare un elemento diverso txnAppId. I nuovi checkpoint iniziano con un ID batch di 0. Delta Lake usa l'ID batch e txnAppId come chiave univoca e ignora i batch con valori già visualizzati.

L'esempio di codice seguente illustra questo modello:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}