Le API APPLY CHANGES: Semplificare l'acquisizione di dati di modifica con Delta Live Tables
Delta Live Tables semplifica il *Change Data Capture* (CDC) attraverso le API APPLY CHANGES
e APPLY CHANGES FROM SNAPSHOT
. L'interfaccia usata dipende dall'origine dei dati delle modifiche:
- Usare
APPLY CHANGES
per elaborare le modifiche da un feed di dati delle modifiche (CDF). - Usare
APPLY CHANGES FROM SNAPSHOT
(anteprima pubblica) per elaborare le modifiche negli snapshot del database.
In precedenza, l'istruzione è stata comunemente usata per l'elaborazione MERGE INTO
dei record CDC in Azure Databricks. Tuttavia, MERGE INTO
può produrre risultati non corretti a causa di record out-of-sequence o richiede logica complessa per riordinare i record.
L'API APPLY CHANGES
è supportata nelle interfacce Sql e Python di Delta Live Tables. L'API APPLY CHANGES FROM SNAPSHOT
è supportata nell'interfaccia Delta Live Tables Python.
Sia APPLY CHANGES
che APPLY CHANGES FROM SNAPSHOT
supportano l'aggiornamento di tables con SCD di tipo 1 e di tipo 2.
- Usare SCD di tipo 1 per i record update direttamente. La cronologia non viene mantenuta per i record aggiornati.
- Usare il tipo SCD 2 per conservare una cronologia dei record, sia per tutti gli aggiornamenti sia per quelli relativi a un set specificato di columns.
Per la sintassi e altri riferimenti, vedere:
- Change Data Capture da un feed di modifiche con Python in Delta Live Tables
- Change Data Capture (Cattura delle Modifiche ai Dati) con SQL in Delta Live Tables
Nota
Questo articolo descrive come updatetables nella pipeline Delta Live Tables in base alle modifiche apportate ai dati di origine. Per imparare a registrare ed eseguire query sulle modifiche a livello di riga per Delta tables, consulta il feed delle modifiche di Delta Lake su Azure Databricks.
Requisiti
Per usare le API CDC, la pipeline deve essere configurata per utilizzare pipeline DLT serverless o le edizioni Delta Live TablesPro
o Advanced
.
Come viene implementato CDC con l'API APPLY CHANGES
?
Gestendo automaticamente i record out-of-sequence, l'API APPLY CHANGES
in Delta Live Tables garantisce l'elaborazione corretta dei record CDC e rimuove la necessità di sviluppare logica complessa per la gestione dei record out-of-sequence. È necessario specificare un column nei dati di origine su cui sequenziare i record, che Delta Live Tables interpreta come rappresentazione monotonicamente crescente dell'ordinamento corretto dei dati di origine. Delta Live Tables gestisce automaticamente i dati non in ordine. Per le modifiche al tipo 2, Delta Live Tables propaga la sequenziazione appropriata values alla table del __START_AT
di destinazione e __END_AT
columns. Deve essere presente un update distinto per ogni chiave in ogni valore di sequenza, e le sequenze NULL values non sono supportate.
Per eseguire l'elaborazione CDC con APPLY CHANGES
, innanzitutto si crea uno table di streaming e quindi si utilizza l'istruzione SQL APPLY CHANGES INTO
o la funzione Python apply_changes()
per specificare l'origine, le chiavi e la sequenziazione per il feed di modifiche. Per creare il flusso di destinazione table, utilizzare l'istruzione CREATE OR REFRESH STREAMING TABLE
per SQL o la funzione create_streaming_table()
per Python. Vedere gli esempi di elaborazione scD di tipo 1 e tipo 2.
Per informazioni dettagliate sulla sintassi, vedere il riferimento SQL Delta Live Tables o il riferimento Python .
Come viene implementato CDC con l'API APPLY CHANGES FROM SNAPSHOT
?
Importante
L'API APPLY CHANGES FROM SNAPSHOT
si trova in anteprima pubblica.
APPLY CHANGES FROM SNAPSHOT
è un'API dichiarativa che determina in modo efficiente le modifiche nei dati di origine confrontando una serie di snapshot in ordine e quindi esegue l'elaborazione necessaria per l'elaborazione CDC dei record negli snapshot.
APPLY CHANGES FROM SNAPSHOT
è supportato solo dall'interfaccia Delta Live Tables Python.
APPLY CHANGES FROM SNAPSHOT
supporta l'inserimento di snapshot da più tipi di origine:
- Utilizzare l'ingestione periodica di snapshot per importare snapshot da un table o da una vista esistente.
APPLY CHANGES FROM SNAPSHOT
dispone di un'interfaccia semplice e semplificata per supportare periodicamente l'inserimento di snapshot da un oggetto di database esistente. Un nuovo snapshot viene inserito ad ogni pipeline updatee l'orario di inserimento viene usato come versione dello snapshot. Quando una pipeline viene eseguita in modalità continua, più snapshot vengono acquisiti con ogni pipeline update in un periodo determinato dall'impostazione dell'intervallo di trigger per il flusso che include l'elaborazione di "APPLY CHANGES FROM SNAPSHOT". - Usare l'inserimento di snapshot cronologici per elaborare i file contenenti snapshot del database, ad esempio gli snapshot generati da un database Oracle o MySQL o da un data warehouse.
Per eseguire l'elaborazione CDC da qualsiasi tipo di origine con APPLY CHANGES FROM SNAPSHOT
, prima si crea uno streaming table e poi si utilizza la funzione apply_changes_from_snapshot()
in Python per specificare lo snapshot, le chiavi e altri argomenti necessari al fine di implementare l'elaborazione. Vedere gli esempi periodici di inserimento snapshot e inserimento di snapshot cronologici.
Gli snapshot passati all'API devono essere in ordine crescente in base alla versione. Se Delta Live Tables rileva uno snapshot non ordinato, viene generato un errore.
Per i dettagli sulla sintassi, consultare il riferimento Python per Delta Live Tables.
Limiti
Il column utilizzato per la sequenziazione deve essere un tipo di dati ordinabile.
Esempio: elaborazione scD di tipo 1 e SCD tipo 2 con dati di origine CDF
Le sezioni seguenti forniscono esempi di query delta live Tables scD di tipo 1 e tipo 2 che updatetables di destinazione in base agli eventi di origine di un feed di dati delle modifiche che:
- Crea nuovi record utente.
- Elimina un record utente.
- Aggiorna i record utente. Nell'esempio SCD di tipo 1, le ultime operazioni di
UPDATE
arrivano in ritardo e vengono eliminate dal target table, dimostrando la gestione degli eventi fuori ordine.
Gli esempi seguenti presuppongono familiarità con la configurazione e l'aggiornamento di pipeline Delta Live Tables. Consulta Esercitazione: Esegui la tua prima pipeline Delta Live Tables.
Per eseguire questi esempi, è necessario iniziare creando un set di dati di esempio. Vedere Generate dati di test.
Di seguito sono riportati i record di input per questi esempi:
userId | name | city | operation (operazione) | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Giglio | Cancun | INSERT | 2 |
123 | Null | Null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Se si rimuove il commento dalla riga finale nei dati di esempio, verrà insert il record seguente che specifica che i record where devono essere troncati:
userId | name | city | operation (operazione) | sequenceNum |
---|---|---|---|---|
Null | Null | Null | TRUNCATE | 3 |
Nota
Tutti gli esempi seguenti includono opzioni per specificare entrambe DELETE
le operazioni e TRUNCATE
, ma ognuna è facoltativa.
Elaborare gli aggiornamenti del tipo 1 di scD
Nell'esempio seguente viene illustrata l'elaborazione degli aggiornamenti del tipo 1 di scD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Dopo aver eseguito l'esempio SCD di tipo 1, il table di destinazione contiene i record seguenti:
userId | name | city |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Giglio | Cancun |
Dopo aver eseguito l'esempio SCD di tipo 1 con il record aggiuntivo TRUNCATE
, i record 124
e 126
vengono troncati a causa dell'operazione TRUNCATE
a sequenceNum=3
, e il record seguente si trova nel table di destinazione.
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
Elaborare gli aggiornamenti del tipo 2 di scD
Nell'esempio seguente viene illustrata l'elaborazione degli aggiornamenti del tipo 2 di scD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Dopo aver eseguito l'esempio di tipo 2 SCD, il table di destinazione contiene i seguenti record:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | Null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | Null |
126 | Giglio | Cancun | 2 | Null |
È anche possibile specificare un sottoinsieme di output columns da monitorare per la cronologia nella destinazione table. Le modifiche apportate ad altri columns vengono aggiornate direttamente anziché generare nuovi record di cronologia. L'esempio seguente illustra l'esclusione del city
column dal rilevamento:
L'esempio seguente illustra l'uso della cronologia delle tracce con il tipo 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Dopo aver eseguito questo esempio senza il record aggiuntivo di TRUNCATE
, il record di destinazione table contiene i seguenti record:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | Null |
125 | Mercedes | Guadalajara | 2 | Null |
126 | Giglio | Cancun | 2 | Null |
Generate dati di test
Il codice seguente viene fornito per generate un set di dati di esempio da usare nelle query di esempio presenti in questa esercitazione. Supponendo di avere il credentials appropriato per creare un nuovo schema e creare una nuova table, è possibile eseguire queste istruzioni con un notebook o Databricks SQL. Il codice seguente non destinato a essere eseguito come parte di una pipeline delta live Tables:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Esempio: Elaborazione periodica degli snapshot
Nell'esempio seguente viene illustrata l'elaborazione del tipo 2 SCD che inserisce snapshot di una table archiviati a mycatalog.myschema.mytable
. I risultati dell'elaborazione vengono scritti in un table denominato target
.
mycatalog.myschema.mytable
record al timestamp 2024-01-01 00:00:00
Chiave | valore |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
record al timestamp 2024-01-01 12:00:00
Chiave | valore |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Dopo l'elaborazione degli snapshot, la destinazione table contiene i seguenti record:
Chiave | valore | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | Null |
3 | a3 | 2024-01-01 12:00:00 | Null |
Esempio: Elaborazione di snapshot cronologici
L'esempio seguente illustra l'elaborazione SCD di tipo 2 che aggiorna il target table in base agli eventi sorgente di due snapshot archiviati in un sistema di archiviazione cloud.
Snapshot in timestamp
, archiviato in /<PATH>/filename1.csv
Chiave | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
Snapshot in timestamp + 5
, archiviato in /<PATH>/filename2.csv
Chiave | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
L'esempio di codice seguente illustra l'elaborazione degli aggiornamenti di tipo 2 con questi snapshot:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Dopo l'elaborazione degli snapshot, la destinazione table contiene i seguenti record:
Chiave | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | Null |
3 | a3 | b3 | 2 | Null |
4 | a4 | b4_new | 1 | Null |
Aggiungere, modificare o eliminare dati in un flusso di destinazione table
Se la pipeline pubblica tables in Unity Catalog, è possibile usare istruzioni DML (Data Manipulation Language)( insert, update, delete e merge ) per modificare il flusso di destinazione tables creato dalle istruzioni APPLY CHANGES INTO
.
Nota
- Le istruzioni DML che modificano il tableschema di un table di streaming non sono supportate. Assicurati che le tue istruzioni DML non tentino di evolvere il tableschema.
- Le istruzioni DML che update un table di streaming possono essere eseguite solo in un cluster Unity condiviso Catalog o in un'istanza di SQL Warehouse usando Databricks Runtime 13.3 LTS e versioni successive.
- Poiché lo streaming richiede origini dati di sola accodamento, se l'elaborazione richiede lo streaming da un flusso di origine table con modifiche (ad esempio, dalle istruzioni DML), setil flag skipChangeCommits durante la lettura del flusso di origine table. Quando
skipChangeCommits
è set, le transazioni che eliminano o modificano i record nel table di origine vengono ignorate. Se l'elaborazione non richiede un flusso table, si può utilizzare una vista materializzata (che non ha la restrizione di solo accodamento) come destinazione table.
Poiché Delta Live Tables usa un SEQUENCE BY
column specificato e propaga values di sequenziazione appropriate al __START_AT
e __END_AT
columns del table di destinazione (per il tipo SCD 2), è necessario assicurarsi che le istruzioni DML usino values valide per questi columns per mantenere l'ordinamento corretto dei record. Vedere Come viene implementato CDC con l'API APPLY CHANGES?.
Per altre informazioni sull'uso di istruzioni DML con flussi tables, vedere Aggiungere, modificare o eliminare dati in un flusso table.
Nell'esempio seguente viene inserito un record attivo con una sequenza iniziale pari a 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Leggere un feed di dati delle modifiche da un APPLY CHANGES
table di destinazione
In Databricks Runtime 15.2 e versioni successive, è possibile leggere un feed di dati delle modifiche da un table in streaming che è la destinazione delle query APPLY CHANGES
o APPLY CHANGES FROM SNAPSHOT
nello stesso modo in cui si legge un feed di dati delle modifiche da altri Delta tables. È necessario quanto segue per leggere il feed di dati delle modifiche da un flusso di destinazione table:
- La destinazione dello streaming table deve essere pubblicata in Unity Catalog. Consulta per usare Catalog Unity con le tue pipeline Delta Live Tables.
- Per leggere il feed di dati delle modifiche dal flusso di destinazione table, devi usare Databricks Runtime 15.2 o versione successiva. Per leggere il feed di dati delle modifiche in un'altra pipeline di Tables Delta Live, la pipeline deve essere configurata per l'uso di Databricks Runtime 15.2 o versione successiva.
Il feed di dati delle modifiche viene letto da un table di streaming di destinazione creato in una pipeline Delta Live Tables, allo stesso modo di come si legge un feed di dati delle modifiche da altri tablesDelta. Per altre informazioni sull'uso della funzionalità del feed di dati delle modifiche Delta, inclusi esempi in Python e SQL, vedere Usare il feed di dati delle modifiche Delta Lake in Azure Databricks.
Nota
Il record del feed di dati delle modifiche include metadati che identificano il tipo di evento di modifica. Quando un record viene aggiornato in un table, i metadati per i record di modifica associati in genere includono _change_type
valuesset fino agli eventi update_preimage
e update_postimage
.
Tuttavia, i _change_type
values sono diversi se vengono eseguiti aggiornamenti al flusso di destinazione table che includono la modifica della chiave primaria values. Quando le modifiche includono aggiornamenti alle chiavi primarie, i campi dei metadati _change_type
vengono aggiornati a set e sono relativi agli eventi insert
e delete
. Le modifiche apportate alle chiavi primarie possono verificarsi quando vengono apportati aggiornamenti manuali a uno dei campi chiave con un'istruzione UPDATE
o MERGE
oppure, per il tipo SCD 2 tables, quando il campo __start_at
cambia al fine di riflettere un valore della sequenza iniziale precedente.
La query APPLY CHANGES
determina la chiave primaria values, che differisce nell'elaborazione SCD di tipo 1 e SCD di tipo 2.
- Per l'elaborazione del tipo 1 SCD e l'interfaccia Python Delta Live Tables, la chiave primaria è il valore del parametro
keys
nella funzioneapply_changes()
. Per l'interfaccia SQL di Delta Live Tables la chiave primaria è la columns definita dalla clausolaKEYS
nell'istruzioneAPPLY CHANGES INTO
. - Per il tipo SCD 2, la chiave primaria è il parametro
keys
o la clausolaKEYS
, più il valore restituito dall'operazionecoalesce(__START_AT, __END_AT)
. where,__START_AT
e__END_AT
sono i columns corrispondenti del flusso di destinazione table.
Get dati sui record elaborati da una query Delta Live Tables CDC
Nota
Le metriche seguenti vengono acquisite solo dalle APPLY CHANGES
query e non dalle APPLY CHANGES FROM SNAPSHOT
query.
Le metriche seguenti vengono acquisite dalle APPLY CHANGES
query:
-
num_upserted_rows
: numero di righe di output inserite nel set di dati durante un update. -
num_deleted_rows
: numero di righe di output esistenti eliminate dal set di dati durante un update.
La num_output_rows
metrica, l'output per i flussi non CDC, non viene acquisito per apply changes
le query.
Quali oggetti dati vengono usati per l'elaborazione di Delta Live Tables CDC?
Nota
- Queste strutture di dati si applicano solo all'elaborazione
APPLY CHANGES
, non all'elaborazioneAPPLY CHANGES FROM SNAPSHOT
. - Queste strutture di dati si applicano solo quando il table di destinazione viene pubblicato nel metastore Hive. Se una pipeline pubblica in Unity Catalog, il supporto interno tables non è accessibile agli utenti.
Quando si dichiara il target table nel metastore Hive, si creano due strutture dati:
- Una visualizzazione che utilizza il nome assegnato al tabledi destinazione.
- Un supporto interno table usato da Delta Live Tables per gestire l'elaborazione CDC. Questo table viene denominato aggiungendo
__apply_changes_storage_
al nome del table di destinazione.
Ad esempio, se si dichiara un table di destinazione denominato dlt_cdc_target
, verrà visualizzata una vista denominata dlt_cdc_target
e una table denominata __apply_changes_storage_dlt_cdc_target
nel metastore. La creazione di una vista consente a Delta Live Tables di filtrare le informazioni aggiuntive (ad esempio, le pietre rimosse e le versioni) necessarie per gestire i dati non ordinati. Per visualizzare i dati elaborati, eseguire una query sulla vista di destinazione. Poiché la schema del __apply_changes_storage_
table potrebbe cambiare per supportare funzionalità o miglioramenti futuri, non è consigliabile eseguire una query sul table per l'uso in produzione. Se si aggiungono dati manualmente al table, si presume che i record precedano altre modifiche perché la versione columns non è presente.