Dela via


API:er för APPLY CHANGES: Förenkla insamling av ändringsdata med Delta Live Tables

Delta Live Tables förenklar insamling av ändringsdata (CDC) med API:erna APPLY CHANGES och APPLY CHANGES FROM SNAPSHOT. Vilket gränssnitt du använder beror på källan för ändringsdata:

  • Används APPLY CHANGES för att bearbeta ändringar från en cdf (change data feed).
  • Använd APPLY CHANGES FROM SNAPSHOT (offentlig förhandsversion) för att bearbeta ändringar i databasögonblicksbilder.

Tidigare användes -instruktionen MERGE INTO ofta för bearbetning av CDC-poster på Azure Databricks. Kan dock MERGE INTO ge felaktiga resultat på grund av poster som inte är sekvenserade eller kräver komplex logik för att ordna om poster.

APPLY CHANGES-API:et stöds i SQL- och Python-gränssnitten för Delta Live Tables. APPLY CHANGES FROM SNAPSHOT-API:et stöds i Python-gränssnittet Delta Live Tables.

Både APPLY CHANGES och APPLY CHANGES FROM SNAPSHOT stöder uppdatering av tabeller med SCD typ 1 och typ 2:

  • Använd SCD typ 1 för att uppdatera poster direkt. Historiken behålls inte för uppdaterade poster.
  • Använd SCD typ 2 för att behålla en historik över poster, antingen på alla uppdateringar eller vid uppdateringar av en angiven uppsättning kolumner.

Syntax och andra referenser finns i:

Kommentar

Den här artikeln beskriver hur du uppdaterar tabeller i din Delta Live Tables-pipeline baserat på ändringar i källdata. Information om hur du registrerar och frågar efter ändringsinformation på radnivå för Delta-tabeller finns i Använda Delta Lake-ändringsdataflöde i Azure Databricks.

Krav

Om du vill använda CDC-API:erna måste din pipeline konfigureras för att använda serverlösa DLT-pipelines eller Delta Live Tables Pro eller Advancedutgåvorna.

Hur implementeras CDC med API:et APPLY CHANGES ?

Genom att automatiskt hantera out-of-sequence-poster säkerställer APPLY CHANGES-API:et i Delta Live Tables korrekt bearbetning av CDC-poster och tar bort behovet av att utveckla komplex logik för hantering av out-of-sequence-poster. Du måste ange en kolumn i källdata som ska sekvensera poster, som Delta Live Tables tolkar som en monotont ökande representation av rätt ordning på källdata. Delta Live Tables hanterar automatiskt data som kommer i fel ordning. För SCD-typ 2-ändringar sprider Delta Live Tables lämpliga sekvenseringsvärden till måltabellens __START_AT- och __END_AT kolumner. Det bör finnas en distinkt uppdatering per nyckel vid varje sekvenseringsvärde och NULL-sekvenseringsvärden stöds inte.

Om du vill utföra CDC-bearbetning med APPLY CHANGESskapar du först en strömmande tabell och använder sedan instruktionen APPLY CHANGES INTO i SQL eller funktionen apply_changes() i Python för att ange källa, nycklar och sekvensering för ändringsflödet. Om du vill skapa måluppspelningstabellen använder du instruktionen CREATE OR REFRESH STREAMING TABLE i SQL eller funktionen create_streaming_table() i Python. Se exempel på SCD-typ 1 och typ 2-bearbetning .

Syntaxinformation finns i Delta Live Tables SQL-referens eller Python-referens.

Hur implementeras CDC med API:et APPLY CHANGES FROM SNAPSHOT ?

Viktigt!

API:et APPLY CHANGES FROM SNAPSHOT finns i offentlig förhandsversion.

APPLY CHANGES FROM SNAPSHOT är ett deklarativt API som effektivt avgör ändringar i källdata genom att jämföra en serie ögonblicksbilder i ordning och sedan köra bearbetningen som krävs för CDC-bearbetning av posterna i ögonblicksbilderna. APPLY CHANGES FROM SNAPSHOT stöds endast av Python-gränssnittet Delta Live Tables.

APPLY CHANGES FROM SNAPSHOT stöder inmatning av ögonblicksbilder från flera källtyper:

  • Använd periodisk inmatning av ögonblicksbilder för att mata in ögonblicksbilder från en befintlig tabell eller vy. APPLY CHANGES FROM SNAPSHOT har ett enkelt, strömlinjeformat gränssnitt som stöder regelbunden inmatning av ögonblicksbilder från ett befintligt databasobjekt. En ny ögonblicksbild matas in med varje pipelineuppdatering och inmatningstiden används som ögonblicksbildversion. När en pipeline körs i kontinuerligt läge matas flera ögonblicksbilder in med varje pipelineuppdatering under en period som bestäms av inställningen utlösarintervall för flödet som innehåller APPLY CHANGES FROM SNAPSHOT-bearbetningen.
  • Använd historisk inmatning av ögonblicksbilder för att bearbeta filer som innehåller ögonblicksbilder av databasen, till exempel ögonblicksbilder som genererats från en Oracle- eller MySQL-databas eller ett informationslager.

Om du vill utföra CDC-bearbetning från valfri källtyp med APPLY CHANGES FROM SNAPSHOTskapar du först en strömmande tabell och använder sedan funktionen apply_changes_from_snapshot() i Python för att ange ögonblicksbilden, nycklarna och andra argument som krävs för att implementera bearbetningen. Se exemplen på periodisk inmatning av ögonblicksbilder och historisk inmatning av ögonblicksbilder.

Ögonblicksbilderna som skickas till API:et måste vara i stigande ordning efter version. Om Delta Live Tables identifierar en felordnad ögonblicksbild utlöses ett fel.

Syntaxinformation finns i Delta Live Tables Python-referensen.

Begränsningar

Kolumnen som används för sekvensering måste vara en sorterbar datatyp.

Exempel: SCD-typ 1 och SCD typ 2-bearbetning med CDF-källdata

Följande avsnitt innehåller exempel på SCD-typ 1 och typ 2-frågor i Delta Live Tables som uppdaterar måltabeller baserat på källhändelser från ett ändringsdataflöde som:

  1. Skapar nya användarposter.
  2. Tar bort en användarpost.
  3. Uppdaterar användarposter. I exemplet SCD typ 1 kommer de sista UPDATE åtgärderna sent och tas bort från måltabellen, vilket visar hanteringen av händelser som är utanför ordning.

I följande exempel förutsätter vi att du är bekant med att konfigurera och uppdatera Delta Live Tables-pipelines. Se Självstudie: Kör din första Delta Live Tables-pipeline.

Om du vill köra de här exemplen måste du börja med att skapa en exempeldatauppsättning. Se till att Generera testdata.

Följande är indataposterna för dessa exempel:

Användar-ID name ort operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lilja Cancun INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Om du avkommenterar den sista raden i exempeldata infogas följande post som anger var poster ska trunkeras:

Användar-ID name ort operation sequenceNum
null null null TRUNCATE 3

Kommentar

I följande exempel finns alternativ för att ange både DELETE åtgärder och åtgärder, men var och TRUNCATE en är valfri.

Bearbeta SCD-typ 1-uppdateringar

I följande exempel visas bearbetning av SCD typ 1-uppdateringar:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Efter att du har kört SCD typ 1-exemplet innehåller måltabellen följande poster:

Användar-ID name ort
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lilja Cancun

Efter att ha kört SCD typ 1-exemplet med den ytterligare TRUNCATE-posten trunkeras posterna 124 och 126 på grund av operationen TRUNCATE vid sequenceNum=3och måltabellen innehåller följande post:

Användar-ID name ort
125 Mercedes Guadalajara

Bearbeta SCD-typ 2-uppdateringar

I följande exempel visas bearbetning av SCD typ 2-uppdateringar:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

När man har kört SCD typ 2-exemplet innehåller måltabellen följande poster:

Användar-ID name ort __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lilja Cancun 2 null

En SCD-typ 2-fråga kan också ange en delmängd av utdatakolumner som ska spåras för historik i måltabellen. Ändringar i andra kolumner uppdateras direkt i stället för att nya historikposter skapas. I följande exempel visas hur du undantar kolumnen city från spårning:

I följande exempel visas hur du använder spårningshistorik med SCD-typ 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

När du har kört det här exemplet utan den ytterligare TRUNCATE-posten innehåller måltabellen följande poster:

Användar-ID name ort __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lilja Cancun 2 null

Generera testdata

Koden nedan tillhandahålls för att generera en exempeldatauppsättning för användning i exempelfrågorna som finns i den här självstudien. Förutsatt att du har rätt autentiseringsuppgifter för att skapa ett nytt schema och skapa en ny tabell kan du köra dessa instruktioner med antingen en notebook-fil eller Databricks SQL. Följande kod är inte avsedd för användning i en Delta Live Tables-pipeline.

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Exempel: Periodisk bearbetning av ögonblicksbilder

I följande exempel visas SCD typ 2-bearbetning som matar in ögonblicksbilder av en tabell som lagras i mycatalog.myschema.mytable. Resultatet av bearbetningen skrivs till en tabell med namnet target.

mycatalog.myschema.mytable poster vid tidsstämpeln 2024-01-01 00:00:00

Tangent Värde
1 a1
2 a2

mycatalog.myschema.mytable poster vid tidsstämpeln 2024-01-01 12:00:00

Tangent Värde
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Efter att ha bearbetat ögonblicksbilderna innehåller måltabellen följande poster:

Tangent Värde __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

Exempel: Bearbetning av historiska ögonblicksbilder

I följande exempel visas SCD-typ 2-bearbetning som uppdaterar en måltabell baserat på källhändelser från två ögonblicksbilder som lagras i ett molnlagringssystem:

Ögonblicksbild vid timestamp, lagrad i /<PATH>/filename1.csv

Tangent TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

Ögonblicksbild vid timestamp + 5, lagrad i /<PATH>/filename2.csv

Tangent TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

I följande kodexempel visas bearbetning av SCD typ 2-uppdateringar med dessa ögonblicksbilder:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Efter att ha bearbetat ögonblicksbilderna innehåller måltabellen följande poster:

Tangent TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 null

Lägga till, ändra eller ta bort data i en direktuppspelningstabell

Om din pipeline publicerar tabeller till Unity Catalog kan du använda instruktioner för datamanipuleringsspråk (DML), inklusive infognings-, uppdaterings-, borttagnings- och sammanslagningsinstruktioner, för att ändra måluppspelningstabeller som skapats av APPLY CHANGES INTO-instruktioner.

Kommentar

  • DML-instruktioner som ändrar tabellschemat för en strömmande tabell stöds inte. Se till att DML-uttrycken inte försöker utveckla tabellschemat.
  • DML-instruktioner som uppdaterar en strömmande tabell kan endast köras i ett delat Unity Catalog-kluster eller ett SQL-lager med Databricks Runtime 13.3 LTS och senare.
  • Eftersom strömning kräver datakällor som bara tillåter tillägg, ska du ange flaggan skipChangeCommits när du läser källströmningstabellen, om din bearbetning kräver strömning från en källströmningstabell med ändringar. När skipChangeCommits anges ignoreras transaktioner som tar bort eller ändrar poster i källtabellen. Om bearbetningen inte kräver en streamingtabell kan du använda en materialiserad vy (som saknar tilläggsbegränsningen) som måltabell.

Eftersom Delta Live Tables använder en angiven SEQUENCE BY kolumn och sprider lämpliga sekvenseringsvärden till __START_AT- och __END_AT kolumnerna i måltabellen (för SCD-typ 2), måste du se till att DML-uttryck använder giltiga värden för dessa kolumner för att upprätthålla rätt ordning på poster. Se Hur implementeras CDC med API:et APPLY CHANGES?.

Mer information om hur du använder DML-instruktioner med strömmande tabeller finns i Lägga till, ändra eller ta bort data i en strömningstabell.

I följande exempel infogas en aktiv post med en startsekvens på 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Läs en ändringsdatafeed från en APPLY CHANGES måltabell

I Databricks Runtime 15.2 och senare kan du läsa ett ändringsdataflöde från en strömmande tabell som är målet för APPLY CHANGES eller APPLY CHANGES FROM SNAPSHOT frågor på samma sätt som du läser ett ändringsdataflöde från andra Delta-tabeller. Följande krävs för att läsa ändringsdataflödet från en målströmningstabell:

  • Målströmningstabellen måste publiceras till Unity Catalog. Se Använda Unity Catalog med dina Delta Live Tables-pipelines.
  • Om du vill läsa ändringsdataflödet från målströmningstabellen måste du använda Databricks Runtime 15.2 eller senare. Om du vill läsa ändringsdataflödet i en annan Delta Live Tables-pipeline måste pipelinen konfigureras för att använda Databricks Runtime 15.2 eller senare.

Du läser ändringsdataflödet från en måluppspelningstabell som skapades i en Delta Live Tables-pipeline på samma sätt som när du läste ett ändringsdataflöde från andra Delta-tabeller. Mer information om hur du använder funktionen deltaändringsdataflöde, inklusive exempel i Python och SQL, finns i Använda Delta Lake-ändringsdataflöde i Azure Databricks.

Kommentar

Posten för ändringsdataflöde innehåller metadata som identifierar typen av ändringshändelse. När en post uppdateras i en tabell innehåller metadata för de associerade ändringsposterna vanligtvis _change_type-värden inställda på update_preimage och update_postimage händelser.

De _change_type värdena skiljer sig dock åt om uppdateringar görs i målströmningstabellen som innehåller ändrade primärnyckelvärden. När ändringar inkluderar uppdateringar av primära nycklar anges fälten för _change_type metadata till insert och delete händelser. Ändringar i primära nycklar kan ske när manuella uppdateringar görs i ett av nyckelfälten med en UPDATE- eller MERGE-instruktion eller, för SCD-tabeller av typ 2, när __start_at-fältet ändras för att återspegla ett tidigare startsekvensvärde.

Frågan APPLY CHANGES avgör de primära nyckelvärdena, som skiljer sig åt för SCD-typ 1- och SCD-typ 2-bearbetning:

  • För SCD-typ 1-bearbetning och Delta Live Tables Python-gränssnittet är den primära nyckeln värdet för parametern keys i funktionen apply_changes(). För DELTA Live Tables SQL-gränssnittet är primärnyckeln kolumnerna som definieras av satsen KEYS i instruktionen APPLY CHANGES INTO.
  • För SCD typ 2 är den primära nyckeln parametern keys eller KEYS-villkoret plus returvärdet från åtgärden coalesce(__START_AT, __END_AT), där __START_AT och __END_AT är motsvarande kolumner från målströmningstabellen.

Hämta data om poster hanterade av en CDC-fråga i Delta Live Tables

Kommentar

Följande mått registreras endast av APPLY CHANGES frågor och inte av APPLY CHANGES FROM SNAPSHOT frågor.

Följande mått samlas in av APPLY CHANGES frågor:

  • num_upserted_rows: Antalet utdatarader som infogats eller uppdaterats i datasetet under en uppdatering.
  • num_deleted_rows: Antalet redan existerande utdatarader som tagits bort från datasettet under en uppdatering.

Måttet num_output_rows , utdata för icke-CDC-flöden, samlas inte in för apply changes frågor.

Vilka dataobjekt används för CDC-bearbetning i Delta Live Tables?

Kommentar

  • Dessa datastrukturer gäller endast för APPLY CHANGES bearbetning, inte APPLY CHANGES FROM SNAPSHOT bearbetning.
  • Dessa datastrukturer gäller endast när måltabellen publiceras till Hive-metaarkivet. Om en pipeline publiceras i Unity Catalog är de interna bakgrundstabellerna inte tillgängliga för användarna.

När du deklarerar måltabellen i Hive-metaarkivet skapas två datastrukturer:

  • En vy med det namn som tilldelats måltabellen.
  • En intern bakgrundstabell som används av Delta Live Tables för att hantera CDC-bearbetning. Den här tabellen namnges genom att lägga till __apply_changes_storage_ före måltabellens namn.

Om du till exempel deklarerar en måltabell med namnet dlt_cdc_targetvisas en vy med namnet dlt_cdc_target och en tabell med namnet __apply_changes_storage_dlt_cdc_target i metaarkivet. Om du skapar en vy kan Delta Live Tables filtrera bort den extra information (till exempel gravstenar och versioner) som krävs för att hantera data som inte är i ordning. Om du vill visa bearbetade data frågar du målvyn. Eftersom schemat för __apply_changes_storage_-tabellen kan ändras för att stödja framtida funktioner eller förbättringar bör du inte köra frågor mot tabellen i produktionsmiljö. Om du lägger till data manuellt i tabellen antas posterna komma före andra ändringar eftersom versionskolumnerna saknas.