Sdílet prostřednictvím


Rozhraní API APPLY CHANGES: Usnadněte zachytávání změn dat pomocí DLT

DLT zjednodušuje zachytávání dat změn (CDC) pomocí rozhraní API APPLY CHANGES a APPLY CHANGES FROM SNAPSHOT. Rozhraní, které používáte, závisí na zdroji změn dat:

  • Ke zpracování změn z datového kanálu změn (CDF) použijte APPLY CHANGES.
  • Ke zpracování změn ve snímcích databáze použijte APPLY CHANGES FROM SNAPSHOT (Public Preview).

Dříve se příkaz MERGE INTO běžně používal ke zpracování záznamů CDC v Azure Databricks. MERGE INTO ale může vést k nesprávným výsledkům kvůli zastaralým záznamům nebo vyžaduje složitou logiku pro opakované řazení záznamů.

Rozhraní APPLY CHANGES API je podporováno v rozhraních DLT SQL a Python. Rozhraní APPLY CHANGES FROM SNAPSHOT API je podporováno v rozhraní DLT Python.

APPLY CHANGES i APPLY CHANGES FROM SNAPSHOT podporují aktualizaci tabulek pomocí scd typu 1 a typu 2:

  • Pomocí scd typu 1 aktualizujte záznamy přímo. Historie se neuchovává u aktualizovaných záznamů.
  • Pomocí scd typu 2 zachovejte historii záznamů, a to buď u všech aktualizací, nebo aktualizací v zadané sadě sloupců.

Syntaxi a další odkazy najdete tady:

Poznámka

Tento článek popisuje, jak aktualizovat tabulky v kanálu DLT na základě změn ve zdrojových datech. Informace o záznamu a dotazování informací o změnách na úrovni řádků pro tabulky Delta najdete v tématu Použití datového kanálu změn Delta Lake v Azure Databricks.

Požadavky

Pokud chcete používat rozhraní API CDC, musí být váš kanál nakonfigurovaný tak, aby používal bezserverové kanály DLT nebo Pro dlt nebo Advancededice.

Jak se CDC implementuje s rozhraním API APPLY CHANGES?

Díky automatickému zpracování záznamů mimo posloupnost zajišťuje rozhraní API APPLY CHANGES v DLT správné zpracování záznamů CDC a odstraňuje potřebu vyvíjet složitou logiku pro jejich zpracování. Ve zdrojových datech musíte zadat sloupec, na kterém se mají sekvencovat záznamy, které DLT interpretuje jako monotonicky rostoucí reprezentaci správného řazení zdrojových dat. DLT automaticky zpracovává data, která přicházejí mimo pořadí. U změn typu 2 typu SCD rozšíří DLT příslušné hodnoty sekvencování do sloupců __START_AT a __END_AT cílové tabulky. V každé hodnotě sekvencování by měla existovat jedna samostatná aktualizace klíče a hodnoty sekvencování NULL nejsou podporovány.

Pokud chcete provádět zpracování CDC pomocí APPLY CHANGES, nejprve vytvoříte streamovací tabulku a pak použijete příkaz APPLY CHANGES INTO v SQL nebo funkci apply_changes() v Pythonu k určení zdroje, klíčů a sekvencování kanálu změn. Pokud chcete vytvořit cílovou streamovací tabulku, použijte příkaz CREATE OR REFRESH STREAMING TABLE v SQL nebo funkci create_streaming_table() v Pythonu. Viz scd typ 1 a typ 2 zpracování příklady.

Podrobnosti o syntaxi najdete v referenční příručce DLT SQL nebo Pythonu.

Jak se CDC implementuje s rozhraním API APPLY CHANGES FROM SNAPSHOT?

Důležitý

Rozhraní API APPLY CHANGES FROM SNAPSHOT je ve verzi Public Preview.

APPLY CHANGES FROM SNAPSHOT je deklarativní rozhraní API, které efektivně určuje změny ve zdrojových datech porovnáním série snímků v pořadí a poté provádí potřebné operace pro CDC záznamů ve snímcích. APPLY CHANGES FROM SNAPSHOT podporuje pouze rozhraní DLT Python.

APPLY CHANGES FROM SNAPSHOT podporuje ingestování snímků z více typů zdrojů:

  • K vkládání snímků z existující tabulky nebo zobrazení použijte periodické vkládání snímků. APPLY CHANGES FROM SNAPSHOT má jednoduché, zjednodušené rozhraní, které podporuje pravidelné ingestování snímků z existujícího databázového objektu. Při každé aktualizaci datového toku se zavádí nový snímek a jako verze snímku se použije čas příjmu dat. Při spuštění kanálu v průběžném režimu se ingestuje více snímků s každou aktualizací kanálu v období určeném nastavením intervalu aktivační události pro tok, který obsahuje zpracování SNÍMKU APPLY CHANGES FROM SNAPSHOT.
  • K zpracování souborů obsahujících snímky databáze, jako jsou snímky vygenerované z databáze Oracle nebo MySQL nebo datového skladu, použijte historický příjem snímků.

Pokud chcete provádět zpracování CDC z libovolného typu zdroje pomocí APPLY CHANGES FROM SNAPSHOT, nejprve vytvoříte streamovací tabulku a pak pomocí funkce apply_changes_from_snapshot() v Pythonu určíte snímek, klíče a další argumenty potřebné k implementaci zpracování. Podívejte se na příklady příjmu pravidelného snímků a příjmu historických snímků.

Snímky nahrané do rozhraní API musí být ve vzestupném pořadí podle verze. Pokud DLT zjistí snímek mimo pořadí, vyvolá se chyba.

Podrobnosti o syntaxi najdete v referenci DLT pro Python.

Omezení

Sloupec použitý pro třídění musí být tříditelný datový typ.

Příklad: Zpracování SCD typu 1 a SCD typu 2 se zdrojovými daty CDF

Následující části obsahují příklady dotazů DLT SCD typu 1 a typu 2, které aktualizují cílové tabulky na základě zdrojových událostí z datového kanálu změn, které:

  1. Vytvoří nové záznamy uživatelů.
  2. Odstraní záznam uživatele.
  3. Aktualizuje záznamy uživatelů. V příkladu typu SCD 1 se poslední operace označené jako UPDATE zpozdí a jsou vyřazeny z cílové tabulky, což demonstruje zpracování událostí mimo pořadí.

Následující příklady předpokládají znalost konfigurace a aktualizace kanálů DLT. Podívejte se na tutoriál: Spusťte svůj první DLT pipeline.

Abyste mohli tyto příklady spustit, musíte začít vytvořením ukázkové datové sady. Viz Generování testovacích dat.

Tady jsou vstupní záznamy pro tyto příklady:

userId Jméno město operace pořadové číslo
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lilie Cancun INSERT 2
123 nula nula VYMAZAT 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Čivava UPDATE 5

Pokud odkomentujete poslední řádek v ukázkových datech, vloží se následující záznam, který určuje, kam se mají záznamy zkrátit:

userId Jméno město operace Číslo sekvence
nula nula nula ZKRÁTIT 3

Poznámka

Všechny následující příklady zahrnují možnosti, jak zadat DELETE a TRUNCATE operace, ale každý z nich je volitelný.

Aktualizace SCD typu 1

Následující příklad ukazuje zpracování aktualizací typu SCD 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Po spuštění příkladu typu SCD 1 obsahuje cílová tabulka následující záznamy:

userId Jméno město
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lilie Cancun

Po spuštění příkladu typu SCD 1 s dalším záznamem TRUNCATE se záznamy 124 a 126 zkrátí kvůli operaci TRUNCATE v sequenceNum=3a cílová tabulka obsahuje následující záznam:

userId Jméno město
125 Mercedes Guadalajara

Aktualizace typu 2 pro SCD

Následující příklad ukazuje zpracování aktualizací typu 2 SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Po spuštění příkladu typu SCD 2 obsahuje cílová tabulka následující záznamy:

userId Jméno město __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Čivava 5 6
124 Raul Oaxaca 1 nula
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 nula
126 Lilie Cancun 2 nula

Dotaz TYPU 2 SCD může také určit podmnožinu výstupních sloupců, které se mají sledovat pro historii v cílové tabulce. Změny v jiných sloupcích se aktualizují místo generování nových záznamů historie. Následující příklad ukazuje vyloučení sloupce city ze sledování:

Následující příklad ukazuje použití historie sledování s SCD typu 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Po spuštění tohoto příkladu bez dalšího záznamu TRUNCATE obsahuje cílová tabulka následující záznamy:

userId Jméno město __START_AT __END_AT
123 Isabel Čivava 1 6
124 Raul Oaxaca 1 nula
125 Mercedes Guadalajara 2 nula
126 Lilie Cancun 2 nula

Generování testovacích dat

Níže uvedený kód slouží k vygenerování ukázkové datové sady pro použití v ukázkových dotazech, které jsou přítomné v tomto kurzu. Za předpokladu, že máte správné přihlašovací údaje k vytvoření nového schématu a vytvoření nové tabulky, můžete tyto příkazy spustit pomocí poznámkového bloku nebo Databricks SQL. Následující kód není zamýšlený ke spuštění jako součást potrubí DLT:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

příklad : Pravidelné zpracování snímků

Následující příklad ukazuje zpracování SCD typu 2, které zpracovává snímky tabulky uložené v mycatalog.myschema.mytable. Výsledky zpracování se zapisují do tabulky s názvem target.

mycatalog.myschema.mytable záznamy s časovým razítkem 2024-01-01 00:00:00

Klíč Hodnota
1 a1
2 a2

mycatalog.myschema.mytable záznamy s časovým razítkem 2024-01-01 12:00:00

Klíč Hodnota
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Po zpracování snímků obsahuje cílová tabulka následující záznamy:

Klíč Hodnota __START_AT __END_AT
1 a1 01.01.2024 00:00:00 2024-01-01 12:00:00
2 a2 01.01.2024 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 nula
3 a3 2024-01-01 12:00:00 nula

příklad : Historické zpracování snímků

Následující příklad ukazuje zpracování typu SCD 2, které aktualizuje cílovou tabulku na základě zdrojových událostí ze dvou snímků uložených v systému cloudového úložiště:

Snímek v timestampuložený v /<PATH>/filename1.csv

Klíč sledovací sloupec NesledovacíSloupec
1 a1 b1
2 a2 b2
4 a4 b4

Snímek pořízený v timestamp + 5, uložený v /<PATH>/filename2.csv

Klíč TrackingColumn SloupecBezSledování
2 a2_new b2
3 a3 b3
4 a4 b4_new

Následující příklad kódu ukazuje zpracování aktualizací SCD typu 2 pomocí těchto snímků:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Po zpracování snímků obsahuje cílová tabulka následující záznamy:

Klíč Sloupec sledování NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 nula
3 a3 b3 2 nula
4 a4 b4_new 1 nula

Přidání, změna nebo odstranění dat v cílové streamovací tabulce

Pokud váš datový kanál publikuje tabulky do Unity Catalogu, můžete použít příkazy jazyka pro manipulaci s daty (DML), včetně příkazů vložení, aktualizace, odstranění a sloučení, ke změně cílových streamovacích tabulek vytvořených příkazy APPLY CHANGES INTO.

Poznámka

  • Příkazy DML, které upravují schéma tabulky streamované tabulky, nejsou podporovány. Ujistěte se, že se příkazy DML nepokoušnou vyvíjet schéma tabulky.
  • Příkazy DML, které aktualizují streamovací tabulku, lze spustit výhradně ve sdíleném clusteru Unity Catalog nebo SQL warehouse pomocí Databricks Runtime 13.3 LTS a vyšších verzí.
  • Vzhledem k tomu, že streamování vyžaduje pouze připojované zdroje dat, vyžaduje-li vaše zpracování streamování ze zdrojové streamovací tabulky, která obsahuje změny (například prostřednictvím příkazů DML), nastavte příznak skipChangeCommits při čtení zdrojové streamovací tabulky. Při nastavení skipChangeCommits budou transakce, které odstraňují nebo upravují záznamy ve zdrojové tabulce, ignorovány. Pokud vaše zpracování nevyžaduje streamovací tabulku, můžete jako cílovou tabulku použít materializované zobrazení (které nemá omezení pouze pro přírůstky).

Vzhledem k tomu, že DLT používá zadaný sloupec SEQUENCE BY a rozšíří příslušné hodnoty sekvencování do __START_AT a __END_AT sloupců cílové tabulky (pro SCD typ 2), je nutné zajistit, aby příkazy DML používaly platné hodnoty pro tyto sloupce, aby zachovaly správné pořadí záznamů. Podívejte se na , jak je CDC implementováno pomocí rozhraní API APPLY CHANGES?.

Další informace o použití příkazů DML se streamovanými tabulkami najdete v tématu Přidání, změna nebo odstranění dat v streamované tabulce.

Následující příklad vloží aktivní záznam s počáteční sekvencí 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

čtení datového kanálu změn z cílové tabulky APPLY CHANGES

V Databricks Runtime 15.2 a vyšší můžete číst datový kanál změn ze streamovací tabulky, která je cílem APPLY CHANGES nebo APPLY CHANGES FROM SNAPSHOT dotazů stejným způsobem, jakým čtete datový kanál změn z jiných tabulek Delta. Pro čtení datového kanálu změn z cílové tabulky streamování jsou potřeba následující:

  • Cílová streamovací tabulka musí být publikovaná v katalogu Unity. Viz Použití katalogu Unity s kanály DLT.
  • Pokud chcete číst datový proud změn z cílové streamovací tabulky, musíte použít Databricks Runtime 15.2 nebo vyšší. Pokud chcete číst datový kanál změn v jiném kanálu DLT, musí být kanál nakonfigurovaný tak, aby používal Databricks Runtime 15.2 nebo vyšší.

Datový kanál změn si přečtete z cílové streamovací tabulky vytvořené v kanálu DLT stejným způsobem jako čtení datového kanálu změn z jiných tabulek Delta. Další informace o používání funkce změnového kanálu Delta, včetně příkladů v Pythonu a SQL, viz Použití datového kanálu změn Delta Lake v Azure Databricks.

Poznámka

Záznam datového kanálu změn obsahuje metadata identifikující typ události změny. Když se záznam aktualizuje v tabulce, metadata přidružených záznamů změn obvykle obsahují _change_type hodnoty nastavené na update_preimage a update_postimage události.

Hodnoty _change_type se ale liší, pokud se provedou aktualizace cílové streamingové tabulky, které zahrnují změnu hodnot primárního klíče. Pokud změny zahrnují aktualizace primárních klíčů, jsou pole metadat _change_type nastavena na insert a delete události. Ke změnám primárních klíčů může dojít v případě, že se ruční aktualizace pro jedno z polí klíčů provádějí pomocí příkazu UPDATE nebo MERGE nebo v případě tabulek typu SCD typu 2, když se pole __start_at změní tak, aby odráželo dřívější počáteční hodnotu sekvence.

Dotaz APPLY CHANGES určuje hodnoty primárního klíče, které se liší pro zpracování typu SCD 1 a SCD typu 2:

  • Pro zpracování typu SCD typu 1 a rozhraní DLT Python je primárním klíčem hodnota parametru keys ve funkci apply_changes(). Pro rozhraní DLT SQL je primární klíč sloupce definované klauzulí KEYS v příkazu APPLY CHANGES INTO.
  • U SCD typu 2 je primárním klíčem keys parametr nebo klauzule KEYS plus návratová hodnota z operace coalesce(__START_AT, __END_AT), kde __START_AT a __END_AT jsou odpovídající sloupce z cílové tabulky streamování.

Získání dat o záznamech zpracovaných dotazem DLT CDC

Poznámka

Následující metriky jsou zachyceny pouze APPLY CHANGES dotazy, nikoli dotazy APPLY CHANGES FROM SNAPSHOT.

Dotazy APPLY CHANGES zaznamenávají následující metriky:

  • num_upserted_rows: Počet výstupních řádků přenesených do datové sady během aktualizace.
  • num_deleted_rows: Počet existujících výstupních řádků odstraněných z datové sady během aktualizace.

Metrika num_output_rows, výstup pro toky bez CDC, se nezachytává pro dotazy apply changes.

Jaké datové objekty se používají ke zpracování DLT CDC?

Poznámka

  • Tyto datové struktury se vztahují pouze na zpracování APPLY CHANGES, nikoli na zpracování APPLY CHANGES FROM SNAPSHOT.
  • Tyto datové struktury platí pouze v případech, kdy je cílová tabulka publikovaná v metastoru Hive. Pokud kanál publikuje do katalogu Unity, jsou interní záložní tabulky pro uživatele nepřístupné.

Když deklarujete cílovou tabulku v metastoru Hive, vytvoří se dvě datové struktury:

  • Zobrazení s názvem přiřazeným k cílové tabulce.
  • Interní podpůrná tabulka používaná DLT pro správu zpracování CDC. Tato tabulka je pojmenovaná tak, že se k názvu cílové tabulky připojí __apply_changes_storage_ jako předpona.

Pokud například deklarujete cílovou tabulku s názvem dlt_cdc_target, zobrazí se v metastoru zobrazení s názvem dlt_cdc_target a tabulka s názvem __apply_changes_storage_dlt_cdc_target. Vytvoření zobrazení umožňuje knihovně DLT vyfiltrovat dodatečné informace (například náhrobky a verze) potřebné ke zpracování dat mimo pořadí. Pokud chcete zobrazit zpracovávaná data, zadejte dotaz na cílové zobrazení. Vzhledem k tomu, že schéma tabulky __apply_changes_storage_ se může změnit tak, aby podporovalo budoucí funkce nebo vylepšení, neměli byste se dotazovat na tabulku pro produkční použití. Pokud do tabulky přidáte data ručně, předpokládá se, že záznamy přicházejí před dalšími změnami, protože sloupce verze chybí.