Interfejsy API ZASTOSUJ ZMIANY: upraszczanie przechwytywania danych zmian za pomocą tabel różnicowych na żywo
Funkcja Delta Live Tables upraszcza przechwytywanie zmian danych (CDC) przy użyciu interfejsów APPLY CHANGES
API i APPLY CHANGES FROM SNAPSHOT
. Używany interfejs zależy od źródła danych zmian:
- Służy
APPLY CHANGES
do przetwarzania zmian ze źródła danych zmian (CDF). - Użyj
APPLY CHANGES FROM SNAPSHOT
funkcji (publiczna wersja zapoznawcza), aby przetworzyć zmiany w migawkach bazy danych.
Wcześniej instrukcja MERGE INTO
była często używana do przetwarzania rekordów CDC w usłudze Azure Databricks. MERGE INTO
Może jednak generować nieprawidłowe wyniki z powodu rekordów poza sekwencją lub wymaga złożonej logiki ponownej kolejności rekordów.
Interfejs APPLY CHANGES
API jest obsługiwany w interfejsach DELTA Live Tables SQL i Python. Interfejs APPLY CHANGES FROM SNAPSHOT
API jest obsługiwany w interfejsie języka Python tabel delta Live Tables.
Zarówno, jak APPLY CHANGES
i APPLY CHANGES FROM SNAPSHOT
obsługują aktualizowanie tabel przy użyciu typu SCD 1 i typu 2:
- Użyj typu SCD 1, aby bezpośrednio aktualizować rekordy. Historia nie jest zachowywana dla zaktualizowanych rekordów.
- Użyj typu SCD 2, aby zachować historię rekordów we wszystkich aktualizacjach lub aktualizacjach określonego zestawu kolumn.
Aby uzyskać informacje o składni i innych odwołaniach, zobacz:
- Zmienianie przechwytywania danych z zestawienia zmian za pomocą języka Python w tabelach delta live tables
- Zmienianie przechwytywania danych za pomocą bazy danych SQL w tabelach delta live
Uwaga
W tym artykule opisano sposób aktualizowania tabel w potoku delta Live Tables na podstawie zmian w danych źródłowych. Aby dowiedzieć się, jak rejestrować i wykonywać zapytania dotyczące zmian na poziomie wiersza dla tabel delty, zobacz Use Delta Lake change data feed on Azure Databricks (Używanie zestawienia zmian usługi Delta Lake w usłudze Azure Databricks).
Wymagania
Aby korzystać z interfejsów API usługi CDC, potok musi być skonfigurowany do używania bezserwerowych potoków DLT lub tabel delta Live Tables Pro
lub Advanced
edycji.
Jak usługa CDC jest implementowana za pomocą interfejsu APPLY CHANGES
API?
Dzięki automatycznej obsłudze rekordów poza sekwencją interfejs APPLY CHANGES
API w tabelach Delta Live Tables zapewnia poprawne przetwarzanie rekordów CDC i eliminuje konieczność opracowania złożonej logiki do obsługi rekordów poza sekwencją. Należy określić kolumnę w danych źródłowych, na których mają być sekwencjonujące rekordy, które tabele Delta Live Tables interpretują jako monotonicznie zwiększającą reprezentację prawidłowego porządku danych źródłowych. Delta Live Tables automatycznie obsługuje dane odbierane z zamówienia. W przypadku zmian typu SCD 2 tabele delta live propagują odpowiednie wartości sekwencjonowania do kolumn i __END_AT
tabeli __START_AT
docelowej. Każda wartość sekwencjonowania powinna zawierać jedną odrębną aktualizację na klucz, a wartości sekwencjonowania NULL nie są obsługiwane.
Aby wykonać przetwarzanie APPLY CHANGES
CDC za pomocą polecenia , należy najpierw utworzyć tabelę przesyłania strumieniowego, a następnie użyć APPLY CHANGES INTO
instrukcji w języku SQL lub apply_changes()
funkcji w języku Python, aby określić źródło, klucze i sekwencjonowanie zestawienia zmian. Aby utworzyć docelową tabelę przesyłania strumieniowego, użyj CREATE OR REFRESH STREAMING TABLE
instrukcji w języku SQL lub create_streaming_table()
funkcji w języku Python. Zobacz przykłady przetwarzania typu SCD 1 i typ 2.
Aby uzyskać szczegółowe informacje o składni, zobacz dokumentację SQL tabel delta Live Tables lub dokumentację języka Python.
Jak usługa CDC jest implementowana za pomocą interfejsu APPLY CHANGES FROM SNAPSHOT
API?
Ważne
Interfejs APPLY CHANGES FROM SNAPSHOT
API jest w publicznej wersji zapoznawczej.
APPLY CHANGES FROM SNAPSHOT
to deklaratywny interfejs API, który efektywnie określa zmiany w danych źródłowych, porównując serię migawek w kolejności, a następnie uruchamia przetwarzanie wymagane do przetwarzania cdC rekordów w migawkach. APPLY CHANGES FROM SNAPSHOT
jest obsługiwany tylko przez interfejs języka Python tabel delta Live Tables.
APPLY CHANGES FROM SNAPSHOT
obsługuje pozyskiwanie migawek z wielu typów źródłowych:
- Użyj okresowego pozyskiwania migawek do pozyskiwania migawek z istniejącej tabeli lub widoku.
APPLY CHANGES FROM SNAPSHOT
Ma prosty, usprawniony interfejs do obsługi okresowego pozyskiwania migawek z istniejącego obiektu bazy danych. Nowa migawka jest pozyskiwana wraz z każdą aktualizacją potoku, a czas pozyskiwania jest używany jako wersja migawki. Gdy potok jest uruchamiany w trybie ciągłym, wiele migawek jest pozyskiwanych z każdą aktualizacją potoku w okresie określonym przez ustawienie interwału wyzwalacza dla przepływu zawierającego zastosowanie zmian z przetwarzania migawki. - Pozyskiwanie historycznych migawek służy do przetwarzania plików zawierających migawki bazy danych, takich jak migawki wygenerowane na podstawie bazy danych Oracle lub MySQL albo magazynu danych.
Aby wykonać przetwarzanie CDC z dowolnego typu APPLY CHANGES FROM SNAPSHOT
źródła za pomocą polecenia , należy najpierw utworzyć tabelę przesyłania strumieniowego, a następnie użyć apply_changes_from_snapshot()
funkcji w języku Python do określenia migawki, kluczy i innych argumentów wymaganych do zaimplementowania przetwarzania. Zobacz przykłady okresowego pozyskiwania migawek i historycznego pozyskiwania migawek.
Migawki przekazywane do interfejsu API muszą być w kolejności rosnącej według wersji. Jeśli funkcja Delta Live Tables wykryje migawkę poza kolejnością, zostanie zgłoszony błąd.
Aby uzyskać szczegółowe informacje o składni, zobacz dokumentację języka Python tabel delta live.
Ograniczenia
Kolumna używana do sekwencjonowania musi być sortowalnym typem danych.
Przykład: typ SCD 1 i typ SCD 2 przetwarzania z danymi źródłowymi CDF
W poniższych sekcjach przedstawiono przykłady różnicowych tabel na żywo typu SCD typu 1 i 2 zapytania aktualizujące tabele docelowe na podstawie zdarzeń źródłowych ze źródła danych, które:
- Tworzy nowe rekordy użytkowników.
- Usuwa rekord użytkownika.
- Aktualizuje rekordy użytkowników. W przykładzie scD typu 1 ostatnie
UPDATE
operacje docierają późno i są usuwane z tabeli docelowej, pokazując obsługę zdarzeń poza kolejnością.
W poniższych przykładach założono, że znajomość konfigurowania i aktualizowania potoków tabel na żywo delty. Zobacz Samouczek: uruchamianie pierwszego potoku delty tabel na żywo.
Aby uruchomić te przykłady, musisz zacząć od utworzenia przykładowego zestawu danych. Zobacz Generowanie danych testowych.
Poniżej przedstawiono rekordy wejściowe dla tych przykładów:
Identyfikator użytkownika | name | miejscowość | rozdzielnicy | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lilia | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
Jeśli usuniesz komentarz z końcowego wiersza w przykładowych danych, wstawi następujący rekord, który określa, gdzie rekordy powinny być obcięte:
Identyfikator użytkownika | name | miejscowość | rozdzielnicy | sequenceNum |
---|---|---|---|---|
null | null | null | OBCIĄĆ | 3 |
Uwaga
Wszystkie poniższe przykłady obejmują opcje określania operacji DELETE
i TRUNCATE
, ale każdy z nich jest opcjonalny.
Przetwarzanie aktualizacji typu SCD 1
W poniższym przykładzie pokazano przetwarzanie aktualizacji typu 1 protokołu SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
Po uruchomieniu przykładu typu SCD 1 tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | name | miejscowość |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lilia | Cancun |
Po uruchomieniu przykładowego typu SCD 1 z dodatkowym TRUNCATE
rekordem rekordy 124
i 126
są obcinane z TRUNCATE
powodu operacji w sequenceNum=3
lokalizacji , a tabela docelowa zawiera następujący rekord:
Identyfikator użytkownika | name | miejscowość |
---|---|---|
125 | Mercedes | Guadalajara |
Przetwarzanie aktualizacji typu SCD 2
W poniższym przykładzie pokazano przetwarzanie aktualizacji typu 2 protokołu SCD:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
Po uruchomieniu przykładowego typu SCD 2 tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | name | miejscowość | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lilia | Cancun | 2 | null |
Zapytanie typu SCD 2 może również określać podzestaw kolumn wyjściowych do śledzenia historii w tabeli docelowej. Zmiany w innych kolumnach są aktualizowane zamiast generowania nowych rekordów historii. W poniższym przykładzie pokazano wykluczenie kolumny city
ze śledzenia:
W poniższym przykładzie pokazano użycie historii śledzenia z typem SCD 2:
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
Po uruchomieniu tego przykładu bez dodatkowego TRUNCATE
rekordu tabela docelowa zawiera następujące rekordy:
Identyfikator użytkownika | name | miejscowość | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lilia | Cancun | 2 | null |
Generowanie danych testowych
Poniższy kod służy do generowania przykładowego zestawu danych do użycia w przykładowych zapytaniach znajdujących się w tym samouczku. Zakładając, że masz odpowiednie poświadczenia do utworzenia nowego schematu i utworzenia nowej tabeli, możesz uruchomić te instrukcje za pomocą notesu lub bazy danych Databricks SQL. Poniższy kod nie jest przeznaczony do uruchomienia w ramach potoku delta Live Tables:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
Przykład: Okresowe przetwarzanie migawek
W poniższym przykładzie pokazano przetwarzanie typu SCD 2, które pozyskuje migawki tabeli przechowywanej w lokalizacji mycatalog.myschema.mytable
. Wyniki przetwarzania są zapisywane w tabeli o nazwie target
.
mycatalog.myschema.mytable
rekordy w znaczniku czasu 2024-01-01 00:00:00
Key | Wartość |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
rekordy w znaczniku czasu 2024-01-01 12:00:00
Key | Wartość |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
Po przetworzeniu migawek tabela docelowa zawiera następujące rekordy:
Key | Wartość | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
Przykład: Historyczne przetwarzanie migawek
W poniższym przykładzie pokazano przetwarzanie typu SCD 2, które aktualizuje tabelę docelową na podstawie zdarzeń źródłowych z dwóch migawek przechowywanych w systemie magazynu w chmurze:
Migawka w lokalizacji timestamp
, przechowywana w /<PATH>/filename1.csv
Klucz | Kolumna śledzenia | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
100 | a4 | b4 |
Migawka w lokalizacji timestamp + 5
, przechowywana w /<PATH>/filename2.csv
Klucz | Kolumna śledzenia | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
100 | a4 | b4_new |
W poniższym przykładzie kodu pokazano przetwarzanie aktualizacji typu 2 protokołu SCD przy użyciu tych migawek:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
Po przetworzeniu migawek tabela docelowa zawiera następujące rekordy:
Klucz | Kolumna śledzenia | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
100 | a4 | b4_new | 1 | null |
Dodawanie, zmienianie lub usuwanie danych w docelowej tabeli przesyłania strumieniowego
Jeśli potok publikuje tabele w wykazie aparatu Unity, możesz użyć instrukcji języka manipulowania danymi (DML), w tym wstawiania, aktualizowania, usuwania i scalania instrukcji, aby modyfikować docelowe tabele przesyłania strumieniowego utworzone przez APPLY CHANGES INTO
instrukcje.
Uwaga
- Instrukcje DML modyfikujące schemat tabeli przesyłania strumieniowego nie są obsługiwane. Upewnij się, że instrukcje DML nie próbują rozwijać schematu tabeli.
- Instrukcje DML, które aktualizują tabelę przesyłania strumieniowego, mogą być uruchamiane tylko w udostępnionym klastrze wykazu aparatu Unity lub w usłudze SQL Warehouse przy użyciu środowiska Databricks Runtime 13.3 LTS lub nowszego.
- Ponieważ przesyłanie strumieniowe wymaga źródeł danych tylko do dołączania, jeśli przetwarzanie wymaga przesyłania strumieniowego ze źródłowej tabeli przesyłania strumieniowego ze zmianami (na przykład instrukcjami DML), ustaw flagę skipChangeCommits podczas odczytywania źródłowej tabeli przesyłania strumieniowego. Po
skipChangeCommits
ustawieniu transakcje, które usuwają lub modyfikują rekordy w tabeli źródłowej, są ignorowane. Jeśli przetwarzanie nie wymaga tabeli przesyłania strumieniowego, możesz użyć zmaterializowanego widoku (który nie ma ograniczenia tylko do dołączania) jako tabeli docelowej.
Ponieważ tabele delta live używa określonej SEQUENCE BY
kolumny i propagują odpowiednie wartości sekwencjonowania do __START_AT
kolumn i __END_AT
tabeli docelowej (dla typu SCD 2), należy upewnić się, że instrukcje DML używają prawidłowych wartości dla tych kolumn, aby zachować właściwą kolejność rekordów. Zobacz Jak usługa CDC jest implementowana za pomocą interfejsu API APPLY CHANGES?.
Aby uzyskać więcej informacji na temat używania instrukcji DML z tabelami przesyłania strumieniowego, zobacz Dodawanie, zmienianie lub usuwanie danych w tabeli przesyłania strumieniowego.
Poniższy przykład wstawia aktywny rekord z sekwencją początkową 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
Odczytywanie zestawienia danych zmian z APPLY CHANGES
tabeli docelowej
W środowisku Databricks Runtime 15.2 lub nowszym można odczytać zestawienie danych zmian z tabeli przesyłania strumieniowego, która jest celem APPLY CHANGES
zapytania lub APPLY CHANGES FROM SNAPSHOT
w taki sam sposób, jak odczytywanie zestawienia zmian danych z innych tabel delty. Aby odczytać źródło danych zmian z docelowej tabeli przesyłania strumieniowego, wymagane są następujące elementy:
- Docelowa tabela przesyłania strumieniowego musi zostać opublikowana w wykazie aparatu Unity. Zobacz Używanie wykazu aparatu Unity z potokami delta Live Tables.
- Aby odczytać zestawienie zmian danych z docelowej tabeli przesyłania strumieniowego, musisz użyć środowiska Databricks Runtime 15.2 lub nowszego. Aby odczytać zestawienie danych zmian w innym potoku delta Live Tables, potok musi być skonfigurowany do używania środowiska Databricks Runtime 15.2 lub nowszego.
Odczytasz źródło danych zmian z docelowej tabeli przesyłania strumieniowego, która została utworzona w potoku delta live tables, tak samo jak odczytywanie zestawienia danych zmian z innych tabel delty. Aby dowiedzieć się więcej na temat korzystania z funkcji źródła danych zmian różnicowych, w tym przykładów w językach Python i SQL, zobacz Use Delta Lake change data feed on Azure Databricks (Używanie zestawienia zmian usługi Delta Lake w usłudze Azure Databricks).
Uwaga
Rekord zestawienia danych zmian zawiera metadane identyfikujące typ zdarzenia zmiany. Po zaktualizowaniu rekordu w tabeli metadane skojarzonych rekordów zmian zwykle zawierają _change_type
wartości ustawione na update_preimage
i update_postimage
zdarzenia.
Jednak wartości są różne, _change_type
jeśli aktualizacje są wprowadzane do docelowej tabeli przesyłania strumieniowego, która obejmuje zmianę wartości klucza podstawowego. Gdy zmiany obejmują aktualizacje kluczy podstawowych, _change_type
pola metadanych są ustawione na insert
i delete
zdarzenia. Zmiany kluczy podstawowych mogą wystąpić, gdy aktualizacje ręczne są wprowadzane do jednego z pól klucza z UPDATE
instrukcją lub MERGE
, w przypadku tabel typu SCD 2, gdy __start_at
pole zmieni się w celu odzwierciedlenia wcześniejszej wartości sekwencji początkowej.
Zapytanie APPLY CHANGES
określa wartości klucza podstawowego, które różnią się w przypadku przetwarzania typu SCD 1 i SCD typu 2:
- W przypadku przetwarzania typu SCD 1 i interfejsu języka Python tabel delta live tables klucz podstawowy jest wartością parametru
keys
apply_changes()
w funkcji. W przypadku interfejsu SQL usługi Delta Live Tables klucz podstawowy to kolumny zdefiniowane przez klauzulęKEYS
w instrukcjiAPPLY CHANGES INTO
. - W przypadku typu SCD 2 klucz podstawowy jest parametrem lub klauzulą
keys
oraz wartością zwracaną zcoalesce(__START_AT, __END_AT)
operacji, gdzie__START_AT
i__END_AT
są odpowiednimi kolumnami z docelowej tabeli przesyłania strumieniowego.KEYS
Pobieranie danych dotyczących rekordów przetwarzanych przez zapytanie CDC tabel delta Live Tables
Uwaga
Poniższe metryki są przechwytywane tylko przez APPLY CHANGES
zapytania, a nie zapytania APPLY CHANGES FROM SNAPSHOT
.
Następujące metryki są przechwytywane przez APPLY CHANGES
zapytania:
num_upserted_rows
: liczba wierszy wyjściowych upserted do zestawu danych podczas aktualizacji.num_deleted_rows
: liczba istniejących wierszy wyjściowych usuniętych z zestawu danych podczas aktualizacji.
num_output_rows
Metryka, dane wyjściowe dla przepływów innych niż CDC, nie są przechwytywane dla apply changes
zapytań.
Jakie obiekty danych są używane do przetwarzania delta Live Tables CDC?
Uwaga: Następujące struktury danych dotyczą tylko APPLY CHANGES
przetwarzania, a nie APPLY CHANGES FROM SNAPSHOT
przetwarzania.
Po zadeklarowaniu tabeli docelowej w magazynie metadanych Hive tworzone są dwie struktury danych:
- Widok używający nazwy przypisanej do tabeli docelowej.
- Wewnętrzna tabela zapasowa używana przez tabele delta Live Tables do zarządzania przetwarzaniem CDC. Ta tabela ma nazwę prepending
__apply_changes_storage_
na nazwę tabeli docelowej.
Jeśli na przykład zadeklarujesz tabelę docelową o nazwie dlt_cdc_target
, zobaczysz widok o nazwie i tabelę o nazwie dlt_cdc_target
__apply_changes_storage_dlt_cdc_target
w magazynie metadanych. Utworzenie widoku umożliwia funkcji Delta Live Tables filtrowanie dodatkowych informacji (na przykład grobowce i wersje) wymaganych do obsługi danych poza kolejnością. Aby wyświetlić przetworzone dane, wykonaj zapytanie względem widoku docelowego. Ponieważ schemat __apply_changes_storage_
tabeli może ulec zmianie w celu obsługi przyszłych funkcji lub ulepszeń, nie należy wykonywać zapytań dotyczących tabeli pod kątem użycia w środowisku produkcyjnym. W przypadku ręcznego dodawania danych do tabeli zakłada się, że rekordy zostaną wprowadzone przed innymi zmianami, ponieważ brakuje kolumn wersji.
Jeśli potok publikuje w wykazie aparatu Unity, wewnętrzne tabele kopii zapasowych są niedostępne dla użytkowników.