API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med DLT
DLT förenklar insamling av ändringsdata (CDC) med API:erna APPLY CHANGES
och APPLY CHANGES FROM SNAPSHOT
. Vilket gränssnitt du använder beror på källan för ändringsdata:
- Använd
APPLY CHANGES
för att bearbeta ändringar från en ändringsdatafeed (CDF). - Använd
APPLY CHANGES FROM SNAPSHOT
(offentlig förhandsversion) för att bearbeta ändringar i databasögonblicksbilder.
Tidigare användes MERGE INTO
-instruktionen ofta för bearbetning av CDC-poster i Azure Databricks. Men MERGE INTO
kan ge felaktiga resultat på grund av poster som inte är sekvenserade eller kräver komplex logik för att ordna om poster.
APPLY CHANGES
-API:et stöds i DLT SQL- och Python-gränssnitten.
APPLY CHANGES FROM SNAPSHOT
-API:et stöds i DLT Python-gränssnittet.
Både APPLY CHANGES
och APPLY CHANGES FROM SNAPSHOT
stöder uppdatering av tabeller med SCD typ 1 och typ 2:
- Använd SCD typ 1 för att uppdatera poster direkt. Historiken behålls inte för uppdaterade poster.
- Använd SCD typ 2 för att behålla en historik över poster, antingen på alla uppdateringar eller vid uppdateringar av en angiven uppsättning kolumner.
Syntax och andra referenser finns i:
Notera
Den här artikeln beskriver hur du uppdaterar tabeller i din DLT-pipeline baserat på ändringar i källdata. Information om hur du registrerar och frågar efter ändringsinformation på radnivå för Delta-tabeller finns i Använda Delta Lake-ändringsdataflöde i Azure Databricks.
Krav
Om du vill använda CDC-API:er måste din pipeline konfigureras för att använda serverlösa DLT-pipelines eller DLT Pro
eller Advanced
versioner.
Hur implementeras CDC med APPLY CHANGES
-API:et?
Genom att automatiskt hantera out-of-sequence-poster säkerställer APPLY CHANGES
-API:et i DLT korrekt bearbetning av CDC-poster och tar bort behovet av att utveckla komplex logik för hantering av out-of-sequence-poster. Du måste ange en kolumn i källdata som ska sekvensera poster, som DLT tolkar som en monotont ökande representation av rätt ordning på källdata. DLT hanterar automatiskt data som kommer i fel ordning. För SCD-typ 2-ändringar sprider DLT lämpliga sekvenseringsvärden till måltabellens __START_AT
- och __END_AT
kolumner. Det bör finnas en distinkt uppdatering per nyckel vid varje sekvenseringsvärde och NULL-sekvenseringsvärden stöds inte.
Om du vill utföra CDC-bearbetning med APPLY CHANGES
skapar du först en strömmande tabell och använder sedan instruktionen APPLY CHANGES INTO
i SQL eller funktionen apply_changes()
i Python för att ange källa, nycklar och sekvensering för ändringsflödet. Om du vill skapa måluppspelningstabellen använder du instruktionen CREATE OR REFRESH STREAMING TABLE
i SQL eller funktionen create_streaming_table()
i Python. Se SCD-typ 1 och typ 2-bearbetning exempel.
Syntaxinformation finns i referensen för DLT SQL eller Python-referens.
Hur implementeras CDC med APPLY CHANGES FROM SNAPSHOT
-API:et?
Viktig
APPLY CHANGES FROM SNAPSHOT
-API:et finns i offentlig förhandsversion.
APPLY CHANGES FROM SNAPSHOT
är ett deklarativt API som effektivt avgör ändringar i källdata genom att jämföra en serie ögonblicksbilder i ordning och sedan köra bearbetningen som krävs för CDC-bearbetning av posterna i ögonblicksbilderna.
APPLY CHANGES FROM SNAPSHOT
stöds endast av DLT Python-gränssnittet.
APPLY CHANGES FROM SNAPSHOT
stöder inmatning av ögonblicksbilder från flera källtyper:
- Använd periodisk inmatning av ögonblicksbilder för att mata in ögonblicksbilder från en befintlig tabell eller vy.
APPLY CHANGES FROM SNAPSHOT
har ett enkelt, strömlinjeformat gränssnitt som stöder regelbunden inmatning av ögonblicksbilder från ett befintligt databasobjekt. En ny ögonblicksbild matas in med varje pipelineuppdatering och inmatningstiden används som ögonblicksbildversion. När en pipeline körs i kontinuerligt läge matas flera ögonblicksbilder in med varje pipelineuppdatering under en period som bestäms av inställningen utlösarintervall för flödet som innehåller APPLY CHANGES FROM SNAPSHOT-bearbetningen. - Använd historisk inmatning av ögonblicksbilder för att bearbeta filer som innehåller ögonblicksbilder av databasen, till exempel ögonblicksbilder som genererats från en Oracle- eller MySQL-databas eller ett informationslager.
Om du vill utföra CDC-bearbetning från valfri källtyp med APPLY CHANGES FROM SNAPSHOT
skapar du först en strömmande tabell och använder sedan funktionen apply_changes_from_snapshot()
i Python för att ange ögonblicksbilden, nycklarna och andra argument som krävs för att implementera bearbetningen. Se exempel på periodisk inmatning av ögonblicksbilder och exempel på historisk inmatning av ögonblicksbilder.
Ögonblicksbilderna som skickas till API:et måste vara i stigande ordning efter version. Om DLT identifierar en ögonblicksbild som är ur funktion, genereras ett felmeddelande.
Syntaxinformation finns i referensen för DLT Python.
Begränsningar
Kolumnen som används för sekvensering måste vara en sorterbar datatyp.
Exempel: SCD-typ 1 och SCD typ 2-bearbetning med CDF-källdata
Följande avsnitt innehåller exempel på frågor av typen DLT SCD typ 1 och typ 2 som uppdaterar måltabeller baserat på källhändelser från ett ändringsdataflöde som:
- Skapar nya användarposter.
- Tar bort en användarpostering.
- Uppdaterar användaruppgifter. I exemplet SCD typ 1 kommer de sista
UPDATE
åtgärderna sent och tas bort från måltabellen, vilket visar hanteringen av händelser som inte är i rätt ordning.
I följande exempel förutsätter vi att du är bekant med att konfigurera och uppdatera DLT-pipelines. Se Självstudie: Kör din första DLT-pipeline.
Om du vill köra de här exemplen måste du börja med att skapa en exempeldatauppsättning. Se till att generera testdata.
Följande är indataposterna för dessa exempel:
userId | Namn | stad | operation | sekvensnummer |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lilja | Cancun | INSERT | 2 |
123 | noll | noll | TA BORT | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Om du avkommenterar den sista raden i exempeldata infogas följande post som anger var poster ska trunkeras:
userId | Namn | stad | verksamhet | sekvensnummer |
---|---|---|---|---|
noll | noll | noll | TRUNKERA | 3 |
Not
I följande exempel finns alternativ för att ange både DELETE
och TRUNCATE
åtgärder, men var och en är valfri.
Process SCD typ-1-uppdateringar
I följande exempel visas bearbetning av SCD typ 1-uppdateringar:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
När du har kört SCD typ 1-exemplet innehåller måltabellen följande poster:
userId | Namn | stad |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lilja | Cancun |
När du har kört SCD typ 1-exemplet med den ytterligare TRUNCATE
posten trunkeras poster 124
och 126
på grund av åtgärden TRUNCATE
i sequenceNum=3
och måltabellen innehåller följande post:
userId | Namn | stad |
---|---|---|
125 | Mercedes | Guadalajara |
Process SCD Typ 2-uppdateringar
I följande exempel visas bearbetning av SCD typ 2-uppdateringar:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
När SCD-typ 2-exemplet har körts innehåller måltabellen följande poster:
userId | Namn | stad | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | noll |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | noll |
126 | Lilja | Cancun | 2 | noll |
En SCD-typ 2-fråga kan också ange en delmängd av utdatakolumner som ska spåras för historik i måltabellen. Ändringar i andra kolumner är uppdaterade på plats i stället för att nya historikposter genereras. I följande exempel visas hur du undantar kolumnen city
från spårning:
I följande exempel visas hur du använder spårningshistorik med SCD-typ 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
När du har kört detta exempel utan den ytterligare TRUNCATE
-posten, innehåller måltabellen de följande posterna:
userId | Namn | stad | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | noll |
125 | Mercedes | Guadalajara | 2 | noll |
126 | Lilja | Cancun | 2 | noll |
Generera testdata
Koden nedan tillhandahålls för att generera en exempeldatauppsättning för användning i exempelfrågorna som finns i den här självstudien. Förutsatt att du har rätt autentiseringsuppgifter för att skapa ett nytt schema och skapa en ny tabell kan du köra dessa instruktioner med antingen en notebook-fil eller Databricks SQL. Följande kod är inte avsedd att köras som en del av en DLT-pipeline:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Exempel: Periodisk bearbetning av ögonblicksbilder
I följande exempel visas SCD typ 2-bearbetning som matar in ögonblicksbilder av en tabell som lagras i mycatalog.myschema.mytable
. Resultatet av bearbetningen skrivs till en tabell med namnet target
.
mycatalog.myschema.mytable
posterar vid tidsstämpeln 2024-01-01 00:00:00
Nyckel | Värde |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
register vid tidsstämpeln 2024-01-01 12:00:00
Nyckel | Värde |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Efter att ha bearbetat snapshots innehåller måltabellen följande poster:
Nyckel | Värde | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 kl. 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | den 1 januari 2024 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | noll |
3 | a3 | 2024-01-01 12:00:00 | noll |
Exempel: Bearbetning av historiska ögonblicksbilder
I följande exempel visas SCD-typ 2-bearbetning som uppdaterar en måltabell baserat på källhändelser från två ögonblicksbilder som lagras i ett molnlagringssystem:
Ögonblicksbild på timestamp
, lagrad i /<PATH>/filename1.csv
Nyckel | Spårningskolumn | Icke-spårningskolumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
Ögonblicksbild på timestamp + 5
, lagrad i /<PATH>/filename2.csv
Nyckel | Spårningskolumn | Ickespårningskolumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
I följande kodexempel visas bearbetning av SCD typ 2-uppdateringar med dessa ögonblicksbilder:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
När ögonblicksbilderna har bearbetats innehåller måltabellen följande poster:
Nyckel | Spårningskolumn | Icke-spårningskolumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | noll |
3 | a3 | b3 | 2 | noll |
4 | a4 | b4_new | 1 | noll |
Lägga till, ändra eller ta bort data i en direktuppspelningstabell
Om din pipeline publicerar tabeller till Unity Catalog kan du använda instruktioner för datamanipuleringsspråk (DML), inklusive infognings-, uppdaterings-, borttagnings- och sammanslagningsinstruktioner, för att ändra måluppspelningstabeller som skapats av APPLY CHANGES INTO
-instruktioner.
Anteckning
- DML-instruktioner som ändrar tabellschemat för en strömmande tabell stöds inte. Se till att DML-uttrycken inte försöker utveckla tabellschemat.
- DML-instruktioner som uppdaterar en strömmande tabell kan endast köras i ett delat Unity Catalog-kluster eller ett SQL-lager med Databricks Runtime 13.3 LTS och senare.
- Eftersom direktuppspelning kräver källor som bara kan läggas till, ange flaggan hoppa överChangeCommits-flaggan när du läser källströmningstabellen om din bearbetning kräver streaming från en källströmningstabell med ändringar. När
skipChangeCommits
anges ignoreras transaktioner som tar bort eller ändrar poster i källtabellen. Om din bearbetning inte kräver en strömmande tabell, kan du använda en materialiserad vy (som inte har begränsningen att endast tillåta tillägg) som måltabell.
Eftersom DLT använder en angiven SEQUENCE BY
kolumn och sprider lämpliga sekvenseringsvärden till __START_AT
- och __END_AT
kolumnerna i måltabellen (för SCD-typ 2), måste du se till att DML-uttryck använder giltiga värden för dessa kolumner för att upprätthålla rätt ordning på poster. Se Hur implementeras CDC med APPLY CHANGES
-API:et?.
Mer information om hur du använder DML-instruktioner med strömmande tabeller finns i Lägga till, ändra eller ta bort data i en strömningstabell.
I följande exempel infogas en aktiv post med en startsekvens på 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Läsa en ändringsdatafeed från en APPLY CHANGES
måltabell
I Databricks Runtime 15.2 och senare kan du läsa ett ändringsdataflöde från en strömmande tabell som är målet för APPLY CHANGES
eller APPLY CHANGES FROM SNAPSHOT
frågor på samma sätt som du läser ett ändringsdataflöde från andra Delta-tabeller. Följande krävs för att läsa ändringsdataflödet från en målströmningstabell:
- Måleströmningstabellen måste publiceras i Unity Catalog. Se till att använda Unity Catalog med dina DLT-pipelines .
- Om du vill läsa ändringsdataflödet från målströmningstabellen måste du använda Databricks Runtime 15.2 eller senare. Om du vill läsa ändringsdataflödet i en annan DLT-pipeline måste pipelinen konfigureras för att använda Databricks Runtime 15.2 eller senare.
Du läser ändringsdataflödet från en målströmningstabell som skapades i en DLT-pipeline på samma sätt som när du läste ett ändringsdataflöde från andra Delta-tabeller. Mer information med exempel i Python och SQL om hur du använder Delta-ändringsdataflöde finns i Använda Delta Lake-ändringsdataflöde i Azure Databricks.
Notera
Posten för ändringsdataflöde innehåller metadata som identifierar typen av ändringshändelse. När en post uppdateras i en tabell innehåller metadata som är förknippad med ändringsposter vanligtvis _change_type
-värden som ställs in på update_preimage
och update_postimage
händelser.
De _change_type
värdena skiljer sig dock åt om uppdateringar görs i målströmningstabellen som innehåller ändrade primärnyckelvärden. När ändringar inkluderar uppdateringar av primära nycklar anges fälten för _change_type
metadata till insert
och delete
händelser. Ändringar i primära nycklar kan ske när manuella uppdateringar görs i ett av nyckelfälten med en UPDATE
- eller MERGE
-instruktion eller, för SCD-tabeller av typ 2, när __start_at
-fältet ändras för att återspegla ett tidigare startsekvensvärde.
Frågan APPLY CHANGES
avgör de primära nyckelvärdena, som skiljer sig åt för SCD-typ 1- och SCD-typ 2-bearbetning:
- För SCD-typ 1-bearbetning och DLT Python-gränssnittet är den primära nyckeln värdet för parametern
keys
i funktionenapply_changes()
. För DLT SQL-gränssnittet är den primära nyckeln de kolumner som definieras avKEYS
-satsen iAPPLY CHANGES INTO
-instruktionen. - För SCD typ 2 är den primära nyckeln parametern
keys
ellerKEYS
-satsen plus returvärdet från operationencoalesce(__START_AT, __END_AT)
, där__START_AT
och__END_AT
är motsvarande kolumner från målströmningstabellen.
Hämta data om poster som har bearbetats av en DLT CDC-frågeställning
Anteckning
Följande mått registreras endast av APPLY CHANGES
frågor och inte av APPLY CHANGES FROM SNAPSHOT
frågor.
Följande mått samlas in av APPLY CHANGES
frågor:
-
num_upserted_rows
: Antalet utdatarader som har uppdaterats och infogats i datamängden under en uppdatering. -
num_deleted_rows
: Antalet befintliga utdatarader som tagits bort från datauppsättningen under en uppdatering.
Måttet num_output_rows
, resultat för icke-CDC-flöden, registreras inte för apply changes
förfrågningar.
Vilka dataobjekt används för DLT CDC-bearbetning?
Notera
- Dessa datastrukturer gäller endast för
APPLY CHANGES
bearbetning, inteAPPLY CHANGES FROM SNAPSHOT
bearbetning. - Dessa datastrukturer gäller endast när måltabellen publiceras till Hive-metaarkivet. Om en pipeline publiceras i Unity Catalog är de interna bakgrundstabellerna inte tillgängliga för användarna.
När du deklarerar måltabellen i Hive-metaarkivet skapas två datastrukturer:
- En vy med det namn som tilldelats måltabellen.
- En intern bakgrundstabell som används av DLT för att hantera CDC-bearbetning. Den här tabellen namnges genom att lägga till
__apply_changes_storage_
i början av måltabellens namn.
Om du till exempel deklarerar en måltabell med namnet dlt_cdc_target
visas en vy med namnet dlt_cdc_target
och en tabell med namnet __apply_changes_storage_dlt_cdc_target
i metaarkivet. Genom att skapa en vy kan DLT filtrera bort den extra information (till exempel gravstenar och versioner) som krävs för att hantera data som inte är i ordning. För att visa de bearbetade datafrågorna, kör en fråga på målvyn. Eftersom schemat för tabellen __apply_changes_storage_
kan ändras för att stödja framtida funktioner eller förbättringar bör du inte använda tabellen för frågor i produktionsmiljö. Om du lägger till data manuellt i tabellen antas posterna komma före andra ändringar eftersom versionskolumnerna saknas.