Condividi tramite


Utilizzare il feed delle modifiche dei dati di Delta Lake in Azure Databricks

Il feed di dati delle modifiche consente ad Azure Databricks di tenere traccia delle modifiche a livello di riga tra le versioni di una tabella Delta. Se abilitata in una tabella Delta, il runtime registra gli eventi di modifica per tutti i dati scritti nella tabella. Sono inclusi i dati di riga insieme ai metadati che indicano se la riga specificata è stata inserita, eliminata o aggiornata.

Importante

Il feed di dati delle modifiche funziona in combinazione con la cronologia delle tabelle per fornire informazioni sulle modifiche. Poiché la clonazione di una tabella Delta crea una cronologia separata, il feed di dati delle modifiche nelle tabelle clonate non corrisponde a quello della tabella originale.

Elaborare in modo incrementale i dati delle modifiche

Databricks consiglia di usare il feed di dati delle modifiche in combinazione con Structured Streaming per elaborare in modo incrementale le modifiche dalle tabelle Delta. È necessario usare Structured Streaming per Azure Databricks per tenere traccia automatica delle versioni per il feed di dati delle modifiche della tabella.

Nota

DLT offre funzionalità per facilitare la propagazione dei dati delle modifiche e l'archiviazione dei risultati come tabelle di dimensione lentamente variabile di tipo 1 o tipo 2. Consulta LE API DI APPLY CHANGES: Semplifica la cattura dei dati delle modifiche (Change Data Capture) con DLT.

Per leggere il feed di dati delle modifiche da una tabella, è necessario abilitare il feed di dati delle modifiche in tale tabella. Vedere Abilitare il feed di dati delle modifiche.

Imposta l'opzione readChangeFeed su true quando configuri un flusso contro una tabella per leggere il feed di dati delle modifiche, come illustrato nel seguente esempio di sintassi:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Per impostazione predefinita, il flusso restituisce l'istantanea più recente della tabella quando il flusso viene avviato per la prima volta come INSERT e le modifiche future come dati di modifica.

I commit dei dati modificati vengono eseguiti come parte della transazione Delta Lake e diventano disponibili contemporaneamente ai nuovi dati che vengono inseriti nella tabella.

Facoltativamente, è possibile specificare una versione iniziale. Vedere È consigliabile specificare una versione iniziale?.

Il feed di dati delle modifiche supporta anche l'esecuzione batch, che richiede la specifica di una versione iniziale. Consulta Leggi le modifiche nelle query batch.

Opzioni come limiti di frequenza (maxFilesPerTrigger, maxBytesPerTrigger) e excludeRegex sono supportate anche durante la lettura dei dati delle modifiche.

La limitazione della frequenza può essere atomica per le versioni diverse dalla versione dello snapshot iniziale. Ovvero, l'intera versione del commit sarà soggetta a limitazione di frequenza o verrà restituito l'intero commit.

È necessario specificare una versione iniziale?

Facoltativamente, è possibile specificare una versione iniziale se si desidera ignorare le modifiche apportate prima di una determinata versione. È possibile specificare una versione usando un timestamp o il numero di ID versione registrato nel log delle transazioni Delta.

Nota

È necessaria una versione iniziale per le letture batch e molti modelli batch possono trarre vantaggio dall'impostazione di una versione finale facoltativa.

Quando si configurano carichi di lavoro structured streaming che coinvolgono feed di dati delle modifiche, è importante comprendere come specificare una versione iniziale influisce sull'elaborazione.

Molti carichi di lavoro di streaming, in particolare le nuove pipeline di elaborazione dati, traggono vantaggio dal comportamento predefinito. Con il comportamento predefinito, il primo batch viene elaborato quando il flusso di dati registra tutti i record esistenti nella tabella come operazioni di INSERT nel feed di dati delle modifiche.

Se la tabella di destinazione contiene già tutti i record con modifiche appropriate fino a un determinato punto, specificare una versione iniziale per evitare di elaborare lo stato della tabella di origine come INSERT eventi.

Nel seguente esempio, la sintassi recupera da un errore di streaming in cui il checkpoint è stato danneggiato. In questo esempio si presuppongono le condizioni seguenti:

  1. Il feed di dati delle modifiche è stato abilitato nella tabella di origine al momento della creazione della tabella.
  2. La tabella downstream di destinazione ha elaborato tutte le modifiche fino alla versione 75 inclusa.
  3. La cronologia delle versioni per la tabella di origine è disponibile per le versioni 70 e successive.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

In questo esempio è necessario specificare anche una nuova posizione del checkpoint.

Importante

Se si specifica una versione iniziale, il flusso non viene avviato da un nuovo checkpoint se la versione iniziale non è più presente nella cronologia delle tabelle. Delta Lake pulisce automaticamente le versioni cronologiche, ovvero tutte le versioni iniziali specificate vengono eliminate.

Vedere È possibile usare il feed di dati delle modifiche per riprodurre l'intera cronologia di una tabella?

Leggere le modifiche nelle query batch

È possibile usare la sintassi delle query batch per leggere tutte le modifiche a partire da una determinata versione o per leggere le modifiche all'interno di un intervallo specificato di versioni.

Specificare una versione come numero intero e un timestamp come stringa nel formato yyyy-MM-dd[ HH:mm:ss[.SSS]].

Le versioni iniziali e finali sono incluse nelle query. Per leggere le modifiche da una determinata versione iniziale alla versione più recente della tabella, specificare solo la versione iniziale.

Se si specifica una versione precedente o un timestamp precedente a uno che ha registrato eventi di modifica, ovvero quando il feed di dati delle modifiche è stato abilitato, viene generato un errore che indica che il feed di dati delle modifiche non è stato abilitato.

Gli esempi di sintassi seguenti illustrano l'uso delle opzioni della versione iniziale e finale con letture batch:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Nota

Per impostazione predefinita, se un utente passa una versione o un timestamp che supera l'ultimo commit in una tabella, viene generato l'errore timestampGreaterThanLatestCommit . In Databricks Runtime 11.3 LTS ed eventuali versioni successive, il feed di dati delle modifiche può gestire il caso di versione fuori dal range se l'utente imposta la seguente configurazione su true:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Se si specifica una versione iniziale maggiore dell'ultimo commit in una tabella o un timestamp di inizio più recente dell'ultimo commit in una tabella, quando la configurazione precedente è abilitata, viene restituito un risultato di lettura vuoto.

Se si specifica una versione finale maggiore dell'ultimo commit in una tabella o un timestamp di fine più recente dell'ultimo commit in una tabella, quando la configurazione precedente è abilitata in modalità di lettura batch, tutte le modifiche tra la versione iniziale e l'ultimo commit vengono restituite.

Qual è lo schema per il feed di dati delle modifiche?

Quando si legge dal feed di dati delle modifiche per una tabella, viene usato lo schema per la versione più recente della tabella.

Nota

La maggior parte delle operazioni di modifica ed evoluzione dello schema è completamente supportata. La tabella con la mappatura delle colonne abilitata non supporta tutti i casi d'uso e mostra un comportamento diverso. Vedere Modificare le limitazioni del feed di dati per le tabelle con mapping delle colonne abilitate.

Oltre alle colonne di dati dello schema della tabella Delta, il feed di dati delle modifiche contiene colonne di metadati che identificano il tipo di evento di modifica:

Nome colonna Tipo Valori
_change_type String insert, update_preimage , update_postimage, delete(1)
_commit_version Lungo Versione del log o della tabella Delta contenente la modifica.
_commit_timestamp Marca temporale Timestamp associato al momento della creazione del commit.

(1)preimage è il valore prima dell'aggiornamento, postimage è il valore dopo l'aggiornamento.

Nota

Non è possibile abilitare il feed di dati delle modifiche in una tabella se lo schema contiene colonne con gli stessi nomi di queste colonne aggiunte. Rinomina le colonne nella tabella per risolvere questo conflitto prima di tentare di abilitare il feed di dati delle modifiche.

Abilitare il feed di dati delle modifiche

È possibile leggere solo il feed di dati delle modifiche per le tabelle abilitate. È necessario abilitare in modo esplicito l'opzione flusso di dati delle modifiche usando uno dei metodi seguenti.

  • Nuova tabella: Impostare la proprietà della tabella delta.enableChangeDataFeed = true nel comando CREATE TABLE.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tabella esistente: impostare la proprietà delta.enableChangeDataFeed = true nella ALTER TABLE del comando.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tutte le nuove tabelle:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Importante

Vengono registrate solo le modifiche apportate dopo aver abilitato il feed di dati delle modifiche. Le modifiche passate a una tabella non vengono acquisite.

Modificare l'archiviazione dei dati

L'abilitazione del feed di dati delle modifiche comporta un piccolo aumento dei costi di archiviazione per una tabella. I record di dati delle modifiche vengono generati durante l'esecuzione della query e in genere sono molto più piccoli delle dimensioni totali dei file riscritti.

Azure Databricks registra i dati delle modifiche per le operazioni UPDATE, DELETE e MERGE nella cartella _change_data all'interno della directory della tabella. Alcune operazioni, ad esempio operazioni di sola inserimento ed eliminazioni di partizioni complete, non generano dati nella _change_data directory perché Azure Databricks può calcolare in modo efficiente il feed di dati delle modifiche direttamente dal log delle transazioni.

Tutte le letture sui file di dati nella _change_data cartella devono passare attraverso le API Delta Lake supportate.

I file nella _change_data cartella seguono i criteri di conservazione della tabella. I dati del feed delle modifiche vengono eliminati quando il comando VACUUM viene eseguito.

È possibile usare il feed di dati delle modifiche per riprodurre l'intera cronologia di una tabella?

Il feed di dati delle modifiche non è destinato a fungere da record permanente di tutte le modifiche apportate a una tabella. Il feed di dati di modifica registra solo le modifiche apportate dopo l'abilitazione.

Il feed di dati delle modifiche e Delta Lake consentono di ricostruire sempre uno snapshot completo di una tabella di origine, ovvero è possibile avviare un nuovo flusso letto in una tabella con feed di dati delle modifiche abilitato e acquisire la versione corrente di tale tabella e tutte le modifiche apportate dopo.

È necessario considerare i record nel feed di dati delle modifiche come temporanei e accessibili solo per una finestra di conservazione specificata. Il log delle transazioni Delta rimuove le versioni delle tabelle e le corrispondenti versioni del feed di dati delle modifiche a intervalli regolari. Quando una versione viene rimossa dal log delle transazioni, non è più possibile leggere il feed di dati delle modifiche per tale versione.

Se il caso d'uso richiede la gestione di una cronologia permanente di tutte le modifiche apportate a una tabella, è consigliabile usare la logica incrementale per scrivere record dal feed di dati delle modifiche a una nuova tabella. L'esempio di codice seguente illustra l'uso trigger.AvailableNowdi , che sfrutta l'elaborazione incrementale di Structured Streaming, ma elabora i dati disponibili come carico di lavoro batch. È possibile pianificare questo carico di lavoro in modo asincrono con le pipeline di elaborazione principali per creare un backup del feed di dati delle modifiche per scopi di controllo o riproduzione completa.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Modificare le limitazioni del feed di dati per le tabelle con mapping di colonne abilitato

Con il mapping delle colonne abilitato in una tabella Delta, è possibile eliminare o rinominare colonne nella tabella senza riscrivere i file di dati per i dati esistenti. Con il mapping delle colonne abilitato, il feed di dati delle modifiche presenta limitazioni dopo l'esecuzione di modifiche dello schema non additive, come la rinominazione o l'eliminazione di una colonna, la modifica del tipo di dati o le modifiche alla nullabilità.

Importante

  • Non è possibile leggere il feed di dati delle modifiche per una transazione o un intervallo in cui si verifica una modifica dello schema non additivo utilizzando la semantica batch.
  • In Databricks Runtime 12.2 LTS e versioni precedenti, le tabelle con mapping di colonna abilitato, che hanno subito modifiche dello schema non additive, non supportano le letture in streaming sul flusso di dati delle modifiche. Vedere Streaming con mapping di colonne e modifiche dello schema.
  • In Databricks Runtime 11.3 LTS e versioni precedenti, non è possibile leggere il feed di modifica per le tabelle con mapping di colonne abilitato che hanno subito ridenominazioni o eliminazioni di colonne.

In Databricks Runtime 12.2 LTS e versioni successive è possibile eseguire letture batch nel feed di dati delle modifiche per le tabelle con mapping di colonne abilitato che hanno subito modifiche dello schema non additive. Anziché usare lo schema della versione più recente della tabella, le operazioni di lettura usano lo schema della versione finale della tabella specificata nella query. La query fallisce ancora se l'intervallo di versioni specificato comprende una modifica dello schema non additiva.