Udostępnij za pośrednictwem


Interfejsy API ZASTOSUJ ZMIANY: upraszczanie przechwytywania danych zmian za pomocą tabel różnicowych na żywo

Funkcja Delta Live Tables upraszcza przechwytywanie zmian danych (CDC) przy użyciu interfejsów APPLY CHANGES API i APPLY CHANGES FROM SNAPSHOT . Używany interfejs zależy od źródła danych zmian:

  • Służy APPLY CHANGES do przetwarzania zmian ze źródła danych zmian (CDF).
  • Użyj APPLY CHANGES FROM SNAPSHOT funkcji (publiczna wersja zapoznawcza), aby przetworzyć zmiany w migawkach bazy danych.

Wcześniej instrukcja MERGE INTO była często używana do przetwarzania rekordów CDC w usłudze Azure Databricks. MERGE INTO Może jednak generować nieprawidłowe wyniki z powodu rekordów poza sekwencją lub wymaga złożonej logiki ponownej kolejności rekordów.

Interfejs APPLY CHANGES API jest obsługiwany w interfejsach DELTA Live Tables SQL i Python. Interfejs APPLY CHANGES FROM SNAPSHOT API jest obsługiwany w interfejsie języka Python tabel delta Live Tables.

Zarówno, jak APPLY CHANGES i APPLY CHANGES FROM SNAPSHOT obsługują aktualizowanie tabel przy użyciu typu SCD 1 i typu 2:

  • Użyj typu SCD 1, aby bezpośrednio aktualizować rekordy. Historia nie jest zachowywana dla zaktualizowanych rekordów.
  • Użyj typu SCD 2, aby zachować historię rekordów we wszystkich aktualizacjach lub aktualizacjach określonego zestawu kolumn.

Aby uzyskać informacje o składni i innych odwołaniach, zobacz:

Uwaga

W tym artykule opisano sposób aktualizowania tabel w potoku delta Live Tables na podstawie zmian w danych źródłowych. Aby dowiedzieć się, jak rejestrować i wykonywać zapytania dotyczące zmian na poziomie wiersza dla tabel delty, zobacz Use Delta Lake change data feed on Azure Databricks (Używanie zestawienia zmian usługi Delta Lake w usłudze Azure Databricks).

Wymagania

Aby korzystać z interfejsów API usługi CDC, potok musi być skonfigurowany do używania bezserwerowych potoków DLT lub tabel delta Live Tables Pro lub Advanced edycji.

Jak usługa CDC jest implementowana za pomocą interfejsu APPLY CHANGES API?

Dzięki automatycznej obsłudze rekordów poza sekwencją interfejs APPLY CHANGES API w tabelach Delta Live Tables zapewnia poprawne przetwarzanie rekordów CDC i eliminuje konieczność opracowania złożonej logiki do obsługi rekordów poza sekwencją. Należy określić kolumnę w danych źródłowych, na których mają być sekwencjonujące rekordy, które tabele Delta Live Tables interpretują jako monotonicznie zwiększającą reprezentację prawidłowego porządku danych źródłowych. Delta Live Tables automatycznie obsługuje dane odbierane z zamówienia. W przypadku zmian typu SCD 2 tabele delta live propagują odpowiednie wartości sekwencjonowania do kolumn i __END_AT tabeli __START_AT docelowej. Każda wartość sekwencjonowania powinna zawierać jedną odrębną aktualizację na klucz, a wartości sekwencjonowania NULL nie są obsługiwane.

Aby wykonać przetwarzanie APPLY CHANGESCDC za pomocą polecenia , należy najpierw utworzyć tabelę przesyłania strumieniowego, a następnie użyć APPLY CHANGES INTO instrukcji w języku SQL lub apply_changes() funkcji w języku Python, aby określić źródło, klucze i sekwencjonowanie zestawienia zmian. Aby utworzyć docelową tabelę przesyłania strumieniowego, użyj CREATE OR REFRESH STREAMING TABLE instrukcji w języku SQL lub create_streaming_table() funkcji w języku Python. Zobacz przykłady przetwarzania typu SCD 1 i typ 2.

Aby uzyskać szczegółowe informacje o składni, zobacz dokumentację SQL tabel delta Live Tables lub dokumentację języka Python.

Jak usługa CDC jest implementowana za pomocą interfejsu APPLY CHANGES FROM SNAPSHOT API?

Ważne

Interfejs APPLY CHANGES FROM SNAPSHOT API jest w publicznej wersji zapoznawczej.

APPLY CHANGES FROM SNAPSHOT to deklaratywny interfejs API, który efektywnie określa zmiany w danych źródłowych, porównując serię migawek w kolejności, a następnie uruchamia przetwarzanie wymagane do przetwarzania cdC rekordów w migawkach. APPLY CHANGES FROM SNAPSHOT jest obsługiwany tylko przez interfejs języka Python tabel delta Live Tables.

APPLY CHANGES FROM SNAPSHOT obsługuje pozyskiwanie migawek z wielu typów źródłowych:

  • Użyj okresowego pozyskiwania migawek do pozyskiwania migawek z istniejącej tabeli lub widoku. APPLY CHANGES FROM SNAPSHOT Ma prosty, usprawniony interfejs do obsługi okresowego pozyskiwania migawek z istniejącego obiektu bazy danych. Nowa migawka jest pozyskiwana wraz z każdą aktualizacją potoku, a czas pozyskiwania jest używany jako wersja migawki. Gdy potok jest uruchamiany w trybie ciągłym, wiele migawek jest pozyskiwanych z każdą aktualizacją potoku w okresie określonym przez ustawienie interwału wyzwalacza dla przepływu zawierającego zastosowanie zmian z przetwarzania migawki.
  • Pozyskiwanie historycznych migawek służy do przetwarzania plików zawierających migawki bazy danych, takich jak migawki wygenerowane na podstawie bazy danych Oracle lub MySQL albo magazynu danych.

Aby wykonać przetwarzanie CDC z dowolnego typu APPLY CHANGES FROM SNAPSHOTźródła za pomocą polecenia , należy najpierw utworzyć tabelę przesyłania strumieniowego, a następnie użyć apply_changes_from_snapshot() funkcji w języku Python do określenia migawki, kluczy i innych argumentów wymaganych do zaimplementowania przetwarzania. Zobacz przykłady okresowego pozyskiwania migawek i historycznego pozyskiwania migawek.

Migawki przekazywane do interfejsu API muszą być w kolejności rosnącej według wersji. Jeśli funkcja Delta Live Tables wykryje migawkę poza kolejnością, zostanie zgłoszony błąd.

Aby uzyskać szczegółowe informacje o składni, zobacz dokumentację języka Python tabel delta live.

Ograniczenia

Kolumna używana do sekwencjonowania musi być sortowalnym typem danych.

Przykład: typ SCD 1 i typ SCD 2 przetwarzania z danymi źródłowymi CDF

W poniższych sekcjach przedstawiono przykłady różnicowych tabel na żywo typu SCD typu 1 i 2 zapytania aktualizujące tabele docelowe na podstawie zdarzeń źródłowych ze źródła danych, które:

  1. Tworzy nowe rekordy użytkowników.
  2. Usuwa rekord użytkownika.
  3. Aktualizuje rekordy użytkowników. W przykładzie scD typu 1 ostatnie UPDATE operacje docierają późno i są usuwane z tabeli docelowej, pokazując obsługę zdarzeń poza kolejnością.

W poniższych przykładach założono, że znajomość konfigurowania i aktualizowania potoków tabel na żywo delty. Zobacz Samouczek: uruchamianie pierwszego potoku delty tabel na żywo.

Aby uruchomić te przykłady, musisz zacząć od utworzenia przykładowego zestawu danych. Zobacz Generowanie danych testowych.

Poniżej przedstawiono rekordy wejściowe dla tych przykładów:

Identyfikator użytkownika name miejscowość rozdzielnicy sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lilia Cancun INSERT 2
123 null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Jeśli usuniesz komentarz z końcowego wiersza w przykładowych danych, wstawi następujący rekord, który określa, gdzie rekordy powinny być obcięte:

Identyfikator użytkownika name miejscowość rozdzielnicy sequenceNum
null null null OBCIĄĆ 3

Uwaga

Wszystkie poniższe przykłady obejmują opcje określania operacji DELETE i TRUNCATE , ale każdy z nich jest opcjonalny.

Przetwarzanie aktualizacji typu SCD 1

W poniższym przykładzie pokazano przetwarzanie aktualizacji typu 1 protokołu SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Po uruchomieniu przykładu typu SCD 1 tabela docelowa zawiera następujące rekordy:

Identyfikator użytkownika name miejscowość
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lilia Cancun

Po uruchomieniu przykładowego typu SCD 1 z dodatkowym TRUNCATE rekordem rekordy 124 i 126 są obcinane z TRUNCATE powodu operacji w sequenceNum=3lokalizacji , a tabela docelowa zawiera następujący rekord:

Identyfikator użytkownika name miejscowość
125 Mercedes Guadalajara

Przetwarzanie aktualizacji typu SCD 2

W poniższym przykładzie pokazano przetwarzanie aktualizacji typu 2 protokołu SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Po uruchomieniu przykładowego typu SCD 2 tabela docelowa zawiera następujące rekordy:

Identyfikator użytkownika name miejscowość __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lilia Cancun 2 null

Zapytanie typu SCD 2 może również określać podzestaw kolumn wyjściowych do śledzenia historii w tabeli docelowej. Zmiany w innych kolumnach są aktualizowane zamiast generowania nowych rekordów historii. W poniższym przykładzie pokazano wykluczenie kolumny city ze śledzenia:

W poniższym przykładzie pokazano użycie historii śledzenia z typem SCD 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Po uruchomieniu tego przykładu bez dodatkowego TRUNCATE rekordu tabela docelowa zawiera następujące rekordy:

Identyfikator użytkownika name miejscowość __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lilia Cancun 2 null

Generowanie danych testowych

Poniższy kod służy do generowania przykładowego zestawu danych do użycia w przykładowych zapytaniach znajdujących się w tym samouczku. Zakładając, że masz odpowiednie poświadczenia do utworzenia nowego schematu i utworzenia nowej tabeli, możesz uruchomić te instrukcje za pomocą notesu lub bazy danych Databricks SQL. Poniższy kod nie jest przeznaczony do uruchomienia w ramach potoku delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Przykład: Okresowe przetwarzanie migawek

W poniższym przykładzie pokazano przetwarzanie typu SCD 2, które pozyskuje migawki tabeli przechowywanej w lokalizacji mycatalog.myschema.mytable. Wyniki przetwarzania są zapisywane w tabeli o nazwie target.

mycatalog.myschema.mytable rekordy w znaczniku czasu 2024-01-01 00:00:00

Key Wartość
1 a1
2 a2

mycatalog.myschema.mytable rekordy w znaczniku czasu 2024-01-01 12:00:00

Key Wartość
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Po przetworzeniu migawek tabela docelowa zawiera następujące rekordy:

Key Wartość __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 null
3 a3 2024-01-01 12:00:00 null

Przykład: Historyczne przetwarzanie migawek

W poniższym przykładzie pokazano przetwarzanie typu SCD 2, które aktualizuje tabelę docelową na podstawie zdarzeń źródłowych z dwóch migawek przechowywanych w systemie magazynu w chmurze:

Migawka w lokalizacji timestamp, przechowywana w /<PATH>/filename1.csv

Klucz Kolumna śledzenia NonTrackingColumn
1 a1 b1
2 a2 b2
100 a4 b4

Migawka w lokalizacji timestamp + 5, przechowywana w /<PATH>/filename2.csv

Klucz Kolumna śledzenia NonTrackingColumn
2 a2_new b2
3 a3 b3
100 a4 b4_new

W poniższym przykładzie kodu pokazano przetwarzanie aktualizacji typu 2 protokołu SCD przy użyciu tych migawek:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Po przetworzeniu migawek tabela docelowa zawiera następujące rekordy:

Klucz Kolumna śledzenia NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
100 a4 b4_new 1 null

Dodawanie, zmienianie lub usuwanie danych w docelowej tabeli przesyłania strumieniowego

Jeśli potok publikuje tabele w wykazie aparatu Unity, możesz użyć instrukcji języka manipulowania danymi (DML), w tym wstawiania, aktualizowania, usuwania i scalania instrukcji, aby modyfikować docelowe tabele przesyłania strumieniowego utworzone przez APPLY CHANGES INTO instrukcje.

Uwaga

  • Instrukcje DML modyfikujące schemat tabeli przesyłania strumieniowego nie są obsługiwane. Upewnij się, że instrukcje DML nie próbują rozwijać schematu tabeli.
  • Instrukcje DML, które aktualizują tabelę przesyłania strumieniowego, mogą być uruchamiane tylko w udostępnionym klastrze wykazu aparatu Unity lub w usłudze SQL Warehouse przy użyciu środowiska Databricks Runtime 13.3 LTS lub nowszego.
  • Ponieważ przesyłanie strumieniowe wymaga źródeł danych tylko do dołączania, jeśli przetwarzanie wymaga przesyłania strumieniowego ze źródłowej tabeli przesyłania strumieniowego ze zmianami (na przykład instrukcjami DML), ustaw flagę skipChangeCommits podczas odczytywania źródłowej tabeli przesyłania strumieniowego. Po skipChangeCommits ustawieniu transakcje, które usuwają lub modyfikują rekordy w tabeli źródłowej, są ignorowane. Jeśli przetwarzanie nie wymaga tabeli przesyłania strumieniowego, możesz użyć zmaterializowanego widoku (który nie ma ograniczenia tylko do dołączania) jako tabeli docelowej.

Ponieważ tabele delta live używa określonej SEQUENCE BY kolumny i propagują odpowiednie wartości sekwencjonowania do __START_AT kolumn i __END_AT tabeli docelowej (dla typu SCD 2), należy upewnić się, że instrukcje DML używają prawidłowych wartości dla tych kolumn, aby zachować właściwą kolejność rekordów. Zobacz Jak usługa CDC jest implementowana za pomocą interfejsu API APPLY CHANGES?.

Aby uzyskać więcej informacji na temat używania instrukcji DML z tabelami przesyłania strumieniowego, zobacz Dodawanie, zmienianie lub usuwanie danych w tabeli przesyłania strumieniowego.

Poniższy przykład wstawia aktywny rekord z sekwencją początkową 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Odczytywanie zestawienia danych zmian z APPLY CHANGES tabeli docelowej

W środowisku Databricks Runtime 15.2 lub nowszym można odczytać zestawienie danych zmian z tabeli przesyłania strumieniowego, która jest celem APPLY CHANGES zapytania lub APPLY CHANGES FROM SNAPSHOT w taki sam sposób, jak odczytywanie zestawienia zmian danych z innych tabel delty. Aby odczytać źródło danych zmian z docelowej tabeli przesyłania strumieniowego, wymagane są następujące elementy:

  • Docelowa tabela przesyłania strumieniowego musi zostać opublikowana w wykazie aparatu Unity. Zobacz Używanie wykazu aparatu Unity z potokami delta Live Tables.
  • Aby odczytać zestawienie zmian danych z docelowej tabeli przesyłania strumieniowego, musisz użyć środowiska Databricks Runtime 15.2 lub nowszego. Aby odczytać zestawienie danych zmian w innym potoku delta Live Tables, potok musi być skonfigurowany do używania środowiska Databricks Runtime 15.2 lub nowszego.

Odczytasz źródło danych zmian z docelowej tabeli przesyłania strumieniowego, która została utworzona w potoku delta live tables, tak samo jak odczytywanie zestawienia danych zmian z innych tabel delty. Aby dowiedzieć się więcej na temat korzystania z funkcji źródła danych zmian różnicowych, w tym przykładów w językach Python i SQL, zobacz Use Delta Lake change data feed on Azure Databricks (Używanie zestawienia zmian usługi Delta Lake w usłudze Azure Databricks).

Uwaga

Rekord zestawienia danych zmian zawiera metadane identyfikujące typ zdarzenia zmiany. Po zaktualizowaniu rekordu w tabeli metadane skojarzonych rekordów zmian zwykle zawierają _change_type wartości ustawione na update_preimage i update_postimage zdarzenia.

Jednak wartości są różne, _change_type jeśli aktualizacje są wprowadzane do docelowej tabeli przesyłania strumieniowego, która obejmuje zmianę wartości klucza podstawowego. Gdy zmiany obejmują aktualizacje kluczy podstawowych, _change_type pola metadanych są ustawione na insert i delete zdarzenia. Zmiany kluczy podstawowych mogą wystąpić, gdy aktualizacje ręczne są wprowadzane do jednego z pól klucza z UPDATE instrukcją lub MERGE , w przypadku tabel typu SCD 2, gdy __start_at pole zmieni się w celu odzwierciedlenia wcześniejszej wartości sekwencji początkowej.

Zapytanie APPLY CHANGES określa wartości klucza podstawowego, które różnią się w przypadku przetwarzania typu SCD 1 i SCD typu 2:

  • W przypadku przetwarzania typu SCD 1 i interfejsu języka Python tabel delta live tables klucz podstawowy jest wartością parametru keys apply_changes() w funkcji. W przypadku interfejsu SQL usługi Delta Live Tables klucz podstawowy to kolumny zdefiniowane przez klauzulę KEYS w instrukcji APPLY CHANGES INTO .
  • W przypadku typu SCD 2 klucz podstawowy jest parametrem lub klauzulą keys oraz wartością zwracaną z coalesce(__START_AT, __END_AT) operacji, gdzie __START_AT i __END_AT są odpowiednimi kolumnami z docelowej tabeli przesyłania strumieniowego.KEYS

Pobieranie danych dotyczących rekordów przetwarzanych przez zapytanie CDC tabel delta Live Tables

Uwaga

Poniższe metryki są przechwytywane tylko przez APPLY CHANGES zapytania, a nie zapytania APPLY CHANGES FROM SNAPSHOT .

Następujące metryki są przechwytywane przez APPLY CHANGES zapytania:

  • num_upserted_rows: liczba wierszy wyjściowych upserted do zestawu danych podczas aktualizacji.
  • num_deleted_rows: liczba istniejących wierszy wyjściowych usuniętych z zestawu danych podczas aktualizacji.

num_output_rows Metryka, dane wyjściowe dla przepływów innych niż CDC, nie są przechwytywane dla apply changes zapytań.

Jakie obiekty danych są używane do przetwarzania delta Live Tables CDC?

Uwaga: Następujące struktury danych dotyczą tylko APPLY CHANGES przetwarzania, a nie APPLY CHANGES FROM SNAPSHOT przetwarzania.

Po zadeklarowaniu tabeli docelowej w magazynie metadanych Hive tworzone są dwie struktury danych:

  • Widok używający nazwy przypisanej do tabeli docelowej.
  • Wewnętrzna tabela zapasowa używana przez tabele delta Live Tables do zarządzania przetwarzaniem CDC. Ta tabela ma nazwę prepending __apply_changes_storage_ na nazwę tabeli docelowej.

Jeśli na przykład zadeklarujesz tabelę docelową o nazwie dlt_cdc_target, zobaczysz widok o nazwie i tabelę o nazwie dlt_cdc_target __apply_changes_storage_dlt_cdc_target w magazynie metadanych. Utworzenie widoku umożliwia funkcji Delta Live Tables filtrowanie dodatkowych informacji (na przykład grobowce i wersje) wymaganych do obsługi danych poza kolejnością. Aby wyświetlić przetworzone dane, wykonaj zapytanie względem widoku docelowego. Ponieważ schemat __apply_changes_storage_ tabeli może ulec zmianie w celu obsługi przyszłych funkcji lub ulepszeń, nie należy wykonywać zapytań dotyczących tabeli pod kątem użycia w środowisku produkcyjnym. W przypadku ręcznego dodawania danych do tabeli zakłada się, że rekordy zostaną wprowadzone przed innymi zmianami, ponieważ brakuje kolumn wersji.

Jeśli potok publikuje w wykazie aparatu Unity, wewnętrzne tabele kopii zapasowych są niedostępne dla użytkowników.