API do zastosowania zmian: Uproszczone przechwytywanie zmian danych za pomocą tabel Delta Live.
Funkcja Delta Live Tables upraszcza przechwytywanie danych zmieniających się (CDC) za pomocą interfejsów API APPLY CHANGES
i APPLY CHANGES FROM SNAPSHOT
. Używany interfejs zależy od źródła danych zmian:
- Służy
APPLY CHANGES
do przetwarzania zmian ze źródła danych zmian (CDF). - Użyj
APPLY CHANGES FROM SNAPSHOT
funkcji (publiczna wersja zapoznawcza), aby przetworzyć zmiany w migawkach bazy danych.
Wcześniej instrukcja MERGE INTO
była często używana do przetwarzania rekordów CDC w usłudze Azure Databricks.
MERGE INTO
Może jednak generować nieprawidłowe wyniki z powodu rekordów poza sekwencją lub wymaga złożonej logiki ponownej kolejności rekordów.
Interfejs API APPLY CHANGES
jest obsługiwany w interfejsach SQL i Python w ramach Delta Live Tables. Interfejs API APPLY CHANGES FROM SNAPSHOT
jest obsługiwany w interfejsie Pythona Delta Live Tables.
Zarówno APPLY CHANGES
, jak i APPLY CHANGES FROM SNAPSHOT
obsługują aktualizowanie tabel przy użyciu typu SCD 1 i typu 2:
- Użyj typu SCD 1, aby bezpośrednio aktualizować rekordy. Historia nie jest zachowywana dla zaktualizowanych rekordów.
- Użyj typu SCD 2, aby zachować historię rekordów we wszystkich aktualizacjach lub aktualizacjach określonego zestawu kolumn.
Aby uzyskać informacje o składni i innych odwołaniach, zobacz:
- Przechwytywanie zmian danych z kanału zmian za pomocą języka Python w Delta Live Tables
- Przechwytywanie zmian danych za pomocą SQL w Delta Live Tables
Uwaga
W tym artykule opisano sposób aktualizowania tabel w potoku Delta Live Tables na podstawie zmian w danych źródłowych. Aby dowiedzieć się, jak rejestrować i wykonywać zapytania dotyczące zmian na poziomie wiersza dla tabel delty, zobacz Use Delta Lake change data feed on Azure Databricks (Używanie zestawienia zmian usługi Delta Lake w usłudze Azure Databricks).
Wymagania
Aby korzystać z interfejsów API usługi CDC, potok musi być skonfigurowany do używania potoków bezserwerowych bibliotek DLLT lub funkcji Delta Live Tables Pro
lub edycji Advanced
.
Jak usługa CDC jest implementowana za pomocą interfejsu APPLY CHANGES
API?
Dzięki automatycznej obsłudze rekordów poza sekwencją interfejs API APPLY CHANGES
w tabelach Delta Live Tables zapewnia prawidłowe przetwarzanie rekordów CDC i eliminuje konieczność opracowania złożonej logiki do obsługi rekordów poza sekwencją. Należy określić kolumnę w danych źródłowych do sekwencjonowania rekordów, co Delta Live Tables interpretuje jako monotonicznie rosnącą reprezentację właściwej kolejności danych źródłowych. Delta Live Tables automatycznie obsługuje dane, które przybywają w nieoczekiwanej kolejności. W przypadku zmian typu SCD 2, Delta Live Tables propaguje odpowiednie wartości sekwencjonowania do kolumn __START_AT
i __END_AT
tabeli docelowej. Każda wartość sekwencjonowania powinna zawierać jedną odrębną aktualizację na klucz, a wartości sekwencjonowania NULL nie są obsługiwane.
Aby wykonać przetwarzanie CDC za pomocą APPLY CHANGES
, należy najpierw utworzyć tabelę streamingową, a następnie użyć instrukcji APPLY CHANGES INTO
w języku SQL lub funkcji apply_changes()
w języku Python, aby określić źródło, klucze i sekwencjonowanie dla strumienia zmian. Aby utworzyć docelową tabelę przesyłania strumieniowego, użyj instrukcji CREATE OR REFRESH STREAMING TABLE
w języku SQL lub funkcji create_streaming_table()
w języku Python.
Zobacz przykłady przetwarzania typu SCD 1 i typ 2.
Aby uzyskać szczegółowe informacje o składni, zobacz odniesienie SQL Delta Live Tables lub odniesienie do Pythona .
Jak usługa CDC jest implementowana za pomocą interfejsu APPLY CHANGES FROM SNAPSHOT
API?
Ważne
Interfejs APPLY CHANGES FROM SNAPSHOT
API jest w publicznej wersji zapoznawczej.
APPLY CHANGES FROM SNAPSHOT
to deklaratywny interfejs API, który efektywnie określa zmiany w danych źródłowych, porównując serię migawek w kolejności, a następnie uruchamia przetwarzanie wymagane do przetwarzania cdC rekordów w migawkach.
APPLY CHANGES FROM SNAPSHOT
jest obsługiwany tylko przez interfejs Python Delta Live Tables.
APPLY CHANGES FROM SNAPSHOT
obsługuje pozyskiwanie migawek z wielu typów źródłowych:
- Użyj okresowego pozyskiwania migawek, aby pobrać migawki z istniejącej tabeli lub widoku.
APPLY CHANGES FROM SNAPSHOT
Ma prosty, usprawniony interfejs do obsługi okresowego pozyskiwania migawek z istniejącego obiektu bazy danych. Nowa migawka jest pozyskiwana wraz z każdą aktualizacją potoku, a czas pozyskiwania jest używany jako wersja migawki. Gdy potok jest uruchamiany w trybie ciągłym, wiele migawek jest pozyskiwanych przy użyciu każdej aktualizacji potoku w okresie określonym przez interwał wyzwalacza ustawienie przepływu zawierającego zastosowanie zmian z przetwarzania migawki. - Pozyskiwanie historycznych migawek służy do przetwarzania plików zawierających migawki bazy danych, takich jak migawki wygenerowane na podstawie bazy danych Oracle lub MySQL albo magazynu danych.
Aby wykonać przetwarzanie CDC z dowolnego typu źródła z użyciem APPLY CHANGES FROM SNAPSHOT
, należy najpierw utworzyć tabelę strumieniową, a następnie użyć funkcji apply_changes_from_snapshot()
w Pythonie, aby określić migawkę, klucze i inne argumenty wymagane do zaimplementowania przetwarzania.
Zobacz przykłady okresowego pozyskiwania migawek i historycznego pozyskiwania migawek.
Migawki przekazywane do interfejsu API muszą być w kolejności rosnącej według wersji. Jeśli funkcja Delta Live Tables wykryje migawkę poza kolejnością, zostanie zgłoszony błąd.
Aby uzyskać szczegółowe informacje o składni, zobacz Delta Live Tables Python reference.
Ograniczenia
Kolumna używana do sekwencjonowania musi być sortowalnym typem danych.
Przykład: typ SCD 1 i typ SCD 2 przetwarzania z danymi źródłowymi CDF
W poniższych sekcjach przedstawiono przykłady zapytań Delta Live Tables typu SCD 1 i 2, które aktualizują tabele docelowe na podstawie zdarzeń źródłowych z kanału strumienia danych zmiany, które:
- Tworzy nowe rekordy użytkowników.
- Usuwa rekord użytkownika.
- Aktualizuje rekordy użytkowników. W przykładzie SCD typu 1, ostatnie operacje
UPDATE
docierają późno i są usuwane z tabeli docelowej, co pokazuje obsługę zdarzeń poza kolejnością.
Poniższe przykłady zakładają znajomość konfigurowania i aktualizowania potoków Delta Live Tables. Zobacz Samouczek: Uruchamianie pierwszego potoku Delta Live Tables.
Aby uruchomić te przykłady, musisz zacząć od utworzenia przykładowego zestawu danych. Zobacz Generowanie danych testowych.
Poniżej przedstawiono rekordy wejściowe dla tych przykładów:
Identyfikator użytkownika | name | miejscowość | rozdzielnicy | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lilia | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Jeśli odkomentujesz ostatni wiersz w przykładowych danych, zostanie dodany następujący rekord, który określa, w którym miejscu rekordy powinny zostać obcięte.
Identyfikator użytkownika | name | miejscowość | rozdzielnicy | sequenceNum |
---|---|---|---|---|
null | null | null | OBCIĄĆ | 3 |
Uwaga
Wszystkie poniższe przykłady obejmują opcje określania operacji DELETE
i TRUNCATE
, ale każdy z nich jest opcjonalny.
Przetwarzanie aktualizacji typu SCD 1
W poniższym przykładzie pokazano przetwarzanie aktualizacji typu 1 protokołu SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Po uruchomieniu przykładu typu SCD 1 tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | name | miejscowość |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lilia | Cancun |
Po uruchomieniu przykładu typu SCD 1 z dodatkowym rekordem TRUNCATE
rekordy 124
i 126
są obcinane z powodu operacji TRUNCATE
w sequenceNum=3
, a tabela docelowa zawiera następujący rekord:
Identyfikator użytkownika | name | miejscowość |
---|---|---|
125 | Mercedes | Guadalajara |
Przetwarzanie aktualizacji typu SCD 2
W poniższym przykładzie pokazano przetwarzanie aktualizacji typu 2 protokołu SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Po uruchomieniu przykładowego typu SCD 2 tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | name | miejscowość | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lilia | Cancun | 2 | null |
Zapytanie typu SCD 2 może również określać podzestaw kolumn wyjściowych do śledzenia historii w tabeli docelowej. Zmiany w innych kolumnach są aktualizowane bezpośrednio, zamiast generować nowe rekordy historii. W poniższym przykładzie pokazano wykluczenie kolumny city
ze śledzenia:
W poniższym przykładzie pokazano użycie historii śledzenia z typem SCD 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Po uruchomieniu tego przykładu bez dodatkowego rekordu TRUNCATE
tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | name | miejscowość | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lilia | Cancun | 2 | null |
Generowanie danych testowych
Poniższy kod służy do generowania przykładowego zestawu danych do użycia w przykładowych zapytaniach znajdujących się w tym samouczku. Zakładając, że masz odpowiednie poświadczenia do utworzenia nowego schematu i utworzenia nowej tabeli, możesz uruchomić te instrukcje za pomocą notesu lub bazy danych Databricks SQL. Poniższy kod jest nie, który ma być uruchamiany jako część potoku delta live tables:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Przykład: Okresowe przetwarzanie migawek
W poniższym przykładzie pokazano przetwarzanie typu SCD 2, które pozyskuje migawki tabeli przechowywanej w mycatalog.myschema.mytable
. Wyniki przetwarzania są zapisywane w tabeli o nazwie target
.
mycatalog.myschema.mytable
rekordy w znaczniku czasu 2024-01-01 00:00:00
Key | Wartość |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
rekordy w znaczniku czasu 2024-01-01 12:00:00
Key | Wartość |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Po przetworzeniu migawek tabela docelowa zawiera następujące rekordy:
Key | Wartość | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
Przykład: Historyczne przetwarzanie migawek
W poniższym przykładzie pokazano przetwarzanie typu SCD 2, które aktualizuje tabelę docelową na podstawie zdarzeń źródłowych z dwóch migawek przechowywanych w systemie magazynu w chmurze:
Migawka w lokalizacji timestamp
, przechowywana w /<PATH>/filename1.csv
Key | Kolumna śledzenia | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
100 | a4 | b4 |
Migawka w lokalizacji timestamp + 5
, przechowywana w /<PATH>/filename2.csv
Key | Kolumna śledzenia | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
100 | a4 | b4_new |
W poniższym przykładzie kodu pokazano przetwarzanie aktualizacji typu 2 protokołu SCD przy użyciu tych migawek:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Po przetworzeniu migawek tabela docelowa zawiera następujące rekordy:
Key | Kolumna śledzenia | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
100 | a4 | b4_new | 1 | null |
Dodawanie, zmienianie lub usuwanie danych w docelowej tabeli przesyłania strumieniowego
Jeśli potok publikuje tabele w Unity Catalog, możesz użyć instrukcji języka manipulowania danymi (DML), w tym instrukcji wstawiania, aktualizowania, usuwania i scalania, aby zmodyfikować docelowe tabele przesyłania strumieniowego utworzone przez instrukcje APPLY CHANGES INTO
.
Uwaga
- Instrukcje DML modyfikujące schemat tabeli przesyłania strumieniowego nie są obsługiwane. Upewnij się, że instrukcje DML nie próbują rozwijać schematu tabeli.
- Instrukcje DML, które aktualizują tabelę strumieniową, mogą być uruchamiane tylko w udostępnionym klastrze Unity Catalog lub w SQL Warehouse przy użyciu Databricks Runtime 13.3 LTS lub nowszego.
- Ponieważ przesyłanie strumieniowe wymaga źródeł danych, które można tylko dołączać, jeśli przetwarzanie wymaga przesyłania strumieniowego ze źródłowej tabeli przesyłania strumieniowego z wprowadzanymi zmianami (na przykład za pomocą instrukcji DML), ustaw flagę skipChangeCommits podczas odczytywania źródłowej tabeli strumieniowej. Po ustawieniu
skipChangeCommits
transakcje, które usuwają lub modyfikują rekordy w tabeli źródłowej, są ignorowane. Jeśli przetwarzanie nie wymaga tabeli przesyłania strumieniowego, możesz użyć zmaterializowanego widoku (który nie ma ograniczenia tylko do dołączania) jako tabeli docelowej.
Ponieważ Delta Live Tables używają określonej kolumny SEQUENCE BY
i propagują odpowiednie wartości sekwencjonowania do kolumn __START_AT
i __END_AT
tabeli docelowej (dla typu SCD 2), należy upewnić się, że instrukcje DML używają prawidłowych wartości dla tych kolumn, aby zachować właściwą kolejność rekordów. Zobacz Jak usługa CDC jest implementowana za pomocą interfejsu API APPLY CHANGES?.
Aby uzyskać więcej informacji na temat używania instrukcji DML z tabelami przesyłania strumieniowego, zobacz Dodawanie, zmienianie lub usuwanie danych w tabeli przesyłania strumieniowego.
Poniższy przykład wstawia aktywny rekord z sekwencją początkową 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Odczytywanie strumienia danych zmian z tabeli docelowej APPLY CHANGES
W środowisku Databricks Runtime 15.2 lub nowszym można odczytać strumień danych zmian z tabeli strumieniowej, która jest celem zapytań APPLY CHANGES
lub APPLY CHANGES FROM SNAPSHOT
, w taki sam sposób, jak odczytywanie strumienia danych zmian z innych tabel Delta. Aby odczytać strumień danych zmian z docelowej tabeli strumieniowej, wymagane są następujące elementy:
- Docelowa tabela strumieniowa musi zostać opublikowana w Unity Catalog. Zobacz , jak używać katalogu Unity z potokami Delta Live Tables.
- Aby odczytać zestawienie zmian danych z docelowej tabeli przesyłania strumieniowego, musisz użyć środowiska Databricks Runtime 15.2 lub nowszego. Aby odczytać zestawienie danych zmian w innym potoku Delta Live Tables, potok musi być skonfigurowany do używania środowiska Databricks Runtime 15.2 lub nowszego.
Odczytasz źródło danych zmian z docelowej tabeli przesyłania strumieniowego, która została utworzona w potoku Delta Live Tables, tak samo jak źródło danych zmian z innych tabel Delta. Aby dowiedzieć się więcej na temat korzystania z funkcji źródła danych zmian różnicowych, w tym przykładów w językach Python i SQL, zobacz Use Delta Lake change data feed on Azure Databricks (Używanie zestawienia zmian usługi Delta Lake w usłudze Azure Databricks).
Uwaga
Rekord zestawienia danych zmian zawiera metadane identyfikujące typ zdarzenia zmiany. Po zaktualizowaniu rekordu w tabeli, metadane powiązanych rekordów zmian zazwyczaj zawierają wartości _change_type
ustawione na update_preimage
oraz zdarzenia update_postimage
.
Jednak wartości _change_type
są różne, jeśli do docelowej tabeli streamingowej wprowadzane są aktualizacje obejmujące zmianę wartości klucza podstawowego. Gdy zmiany obejmują aktualizacje kluczy podstawowych, pola metadanych _change_type
są ustawione na wydarzenia insert
i delete
. Zmiany kluczy podstawowych mogą wystąpić, gdy ręczne aktualizacje są wprowadzane do jednego z pól klucza z instrukcją UPDATE
lub MERGE
lub w przypadku tabel typu SCD 2, gdy pole __start_at
zmieni się w celu odzwierciedlenia wcześniejszej wartości sekwencji początkowej.
Zapytanie APPLY CHANGES
określa wartości klucza podstawowego, które różnią się w przypadku przetwarzania typu SCD 1 i SCD typu 2:
- W przypadku przetwarzania typu SCD 1 i interfejsu języka Python Tabel Delta Live Tables klucz podstawowy jest wartością parametru
keys
w funkcjiapply_changes()
. W przypadku interfejsu SQL usługi Delta Live Tables klucz podstawowy to kolumny zdefiniowane przez klauzulęKEYS
w instrukcjiAPPLY CHANGES INTO
. - W przypadku typu SCD 2 klucz podstawowy jest parametrem
keys
lub klauzuląKEYS
oraz wartością zwracaną z operacjicoalesce(__START_AT, __END_AT)
, gdzie__START_AT
i__END_AT
są odpowiednimi kolumnami z docelowej tabeli przesyłania strumieniowego.
Pobierz dane dotyczące rekordów przetwarzanych przez zapytanie CDC Delta Live Tables
Uwaga
Poniższe metryki są przechwytywane tylko przez APPLY CHANGES
zapytania, a nie zapytania APPLY CHANGES FROM SNAPSHOT
.
Następujące metryki są przechwytywane przez APPLY CHANGES
zapytania:
-
num_upserted_rows
: liczba wierszy wyjściowych została przeładowana do zestawu danych podczas aktualizacji. -
num_deleted_rows
: liczba istniejących wierszy wyjściowych usuniętych z zestawu danych podczas aktualizacji.
num_output_rows
Metryka, dane wyjściowe dla przepływów innych niż CDC, nie są przechwytywane dla apply changes
zapytań.
Jakie obiekty danych są używane do przetwarzania Delta Live Tables CDC?
Uwaga
- Te struktury danych dotyczą tylko przetwarzania
APPLY CHANGES
, a nie przetwarzaniaAPPLY CHANGES FROM SNAPSHOT
. - Te struktury danych mają zastosowanie tylko wtedy, gdy tabela docelowa zostanie opublikowana w magazynie metadanych Hive. Jeśli potok publikuje w Unity Catalog, wewnętrzne tabele wspierające są niedostępne dla użytkowników.
Po zadeklarowaniu tabeli docelowej w magazynie metadanych Hive tworzone są dwie struktury danych:
- Widok używający nazwy przypisanej do tabeli docelowej.
- Wewnętrzna tabela pomocnicza używana przez Delta Live Tables do zarządzania przetwarzaniem CDC. Nazwa tej tabeli powstaje przez dodanie przedrostka
__apply_changes_storage_
do nazwy tabeli docelowej.
Jeśli na przykład zadeklarujesz tabelę docelową o nazwie dlt_cdc_target
, w magazynie metadanych zostanie wyświetlony widok o nazwie dlt_cdc_target
i tabela o nazwie __apply_changes_storage_dlt_cdc_target
. Utworzenie widoku umożliwia funkcji Delta Live Tables filtrowanie dodatkowych informacji (na przykład grobowce i wersje) wymaganych do obsługi danych poza kolejnością. Aby wyświetlić przetworzone dane, wykonaj zapytanie względem widoku docelowego. Ponieważ schemat tabeli __apply_changes_storage_
może ulec zmianie w celu obsługi przyszłych funkcji lub ulepszeń, nie należy wykonywać zapytań dotyczących tabeli do użytku produkcyjnego. W przypadku ręcznego dodawania danych do tabeli zakłada się, że rekordy zostaną wprowadzone przed innymi zmianami, ponieważ brakuje kolumn wersji.