Dokumentacja języka SQL usługi Delta Live Tables
Ten artykuł zawiera szczegółowe informacje dotyczące interfejsu programowania SQL delta Live Tables.
- Aby uzyskać informacje na temat interfejsu API języka Python, zobacz dokumentację języka Python tabel delta live tables.
- Aby uzyskać więcej informacji na temat poleceń SQL, zobacz Dokumentacja języka SQL.
W zapytaniach SQL można używać funkcji zdefiniowanych przez użytkownika (UDF) języka Python, ale przed wywołaniem ich w plikach źródłowych SQL należy zdefiniować te funkcje zdefiniowane przez użytkownika. Zobacz Funkcje skalarne zdefiniowane przez użytkownika — Python.
Ograniczenia
Klauzula nie jest obsługiwana PIVOT
. Operacja pivot
na platformie Spark wymaga chętnego ładowania danych wejściowych w celu obliczenia schematu wyjściowego. Ta funkcja nie jest obsługiwana w tabelach delta live.
Tworzenie tabel delta live zmaterializowanego widoku lub tabeli przesyłania strumieniowego
Uwaga
- Składnia
CREATE OR REFRESH LIVE TABLE
tworzenia zmaterializowanego widoku jest przestarzała. Zamiast tego użyj poleceniaCREATE OR REFRESH MATERIALIZED VIEW
. - Aby użyć klauzuli
CLUSTER BY
w celu włączenia klastrowania liquid, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.
Ta sama podstawowa składnia SQL jest używana podczas deklarowania tabeli przesyłania strumieniowego lub zmaterializowanego widoku.
Deklarowanie zmaterializowanego widoku tabel różnicowych na żywo za pomocą języka SQL
Poniżej opisano składnię deklarowania zmaterializowanego widoku w tabelach Delta Live Tables za pomocą języka SQL:
CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Deklarowanie tabeli przesyłania strumieniowego tabel delta live tables za pomocą języka SQL
Tabele przesyłania strumieniowego można zadeklarować tylko przy użyciu zapytań odczytywanych względem źródła przesyłania strumieniowego. Usługa Databricks zaleca używanie automatycznego modułu ładującego do pozyskiwania plików przesyłanych strumieniowo z magazynu obiektów w chmurze. Zobacz Auto loader SQL syntax (Składnia SQL modułu automatycznego ładowania).
Podczas określania innych tabel lub widoków w potoku jako źródeł przesyłania strumieniowego należy uwzględnić STREAM()
funkcję wokół nazwy zestawu danych.
Poniżej opisano składnię deklarowania tabeli przesyłania strumieniowego w tabelach delta Live Tables za pomocą języka SQL:
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Tworzenie widoku tabel na żywo delty
Poniżej opisano składnię deklarowania widoków za pomocą języka SQL:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Składnia SQL automatycznego modułu ładującego
Poniżej opisano składnię pracy z modułem automatycznego ładowania w programie SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
Możesz użyć obsługiwanych opcji formatowania z modułem automatycznego ładowania. map()
Za pomocą funkcji można przekazać opcje do read_files()
metody . Opcje to pary klucz-wartość, w których klucze i wartości są ciągami. Aby uzyskać szczegółowe informacje na temat formatów i opcji obsługi, zobacz Opcje formatu pliku.
Przykład: Definiowanie tabel
Zestaw danych można utworzyć, odczytując z zewnętrznego źródła danych lub z zestawów danych zdefiniowanych w potoku. Aby odczytać z wewnętrznego zestawu danych, należy LIVE
wstępnie wpisać słowo kluczowe na nazwę zestawu danych. Poniższy przykład definiuje dwa różne zestawy danych: tabelę o nazwie taxi_raw
, która przyjmuje plik JSON jako źródło wejściowe i tabelę o nazwie filtered_data
, która przyjmuje taxi_raw
tabelę jako dane wejściowe:
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
Przykład: odczyt ze źródła przesyłania strumieniowego
Aby odczytać dane ze źródła przesyłania strumieniowego, na przykład moduł automatycznego ładowania lub wewnętrzny zestaw danych, zdefiniuj tabelę STREAMING
:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
Aby uzyskać więcej informacji na temat danych przesyłanych strumieniowo, zobacz Przekształcanie danych za pomocą tabel delta live.
Kontrolowanie sposobu materializacji tabel
Tabele oferują również dodatkową kontrolę nad ich materializacją:
- Określ sposób partycjonowania tabel przy użyciu polecenia
PARTITIONED BY
. Możesz użyć partycjonowania, aby przyspieszyć wykonywanie zapytań. - Właściwości tabeli można ustawić przy użyciu polecenia
TBLPROPERTIES
. Zobacz Właściwości tabeli Tabele na żywo funkcji Delta. - Ustaw lokalizację magazynu przy użyciu
LOCATION
ustawienia . Domyślnie dane tabeli są przechowywane w lokalizacji magazynu potoku, jeśliLOCATION
nie są ustawione. - W definicji schematu można użyć wygenerowanych kolumn . Zobacz Przykład: określanie schematu i kolumn partycji.
Uwaga
W przypadku tabel o rozmiarze mniejszym niż 1 TB usługa Databricks zaleca umożliwienie usłudze Delta Live Tables kontrolowania organizacji danych. Jeśli nie spodziewasz się, że tabela przekroczy terabajt, usługa Databricks zaleca, aby nie określać kolumn partycji.
Przykład: określanie schematu i kolumn partycji
Opcjonalnie można określić schemat podczas definiowania tabeli. Poniższy przykład określa schemat tabeli docelowej, w tym użycie kolumn wygenerowanych przez usługę Delta Lake i zdefiniowanie kolumn partycji dla tabeli:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Domyślnie tabele na żywo delty wywnioskują schemat z table
definicji, jeśli nie określisz schematu.
Przykład: Definiowanie ograniczeń tabeli
Uwaga
Obsługa funkcji Delta Live Tables dla ograniczeń tabel jest dostępna w publicznej wersji zapoznawczej. Aby zdefiniować ograniczenia tabeli, potok musi być potokiem obsługującym wykaz aparatu Unity i skonfigurowanym do korzystania z kanału preview
.
Podczas określania schematu można zdefiniować klucze podstawowe i obce. Ograniczenia są informacyjne i nie są wymuszane. Zobacz klauzulę CONSTRAINT w dokumentacji języka SQL.
W poniższym przykładzie zdefiniowano tabelę z ograniczeniem klucza podstawowego i obcego:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Parametryzowanie wartości używanych podczas deklarowania tabel lub widoków przy użyciu języka SQL
Służy SET
do określania wartości konfiguracji w zapytaniu, które deklaruje tabelę lub widok, w tym konfiguracje platformy Spark. Każda tabela lub widok zdefiniowany w notesie po SET
instrukcji ma dostęp do zdefiniowanej wartości. Wszystkie konfiguracje platformy Spark określone przy użyciu SET
instrukcji są używane podczas wykonywania zapytania Spark dla dowolnej tabeli lub widoku zgodnie z instrukcją SET. Aby odczytać wartość konfiguracji w zapytaniu, użyj składni ${}
interpolacji ciągów . W poniższym przykładzie ustawiono wartość konfiguracji platformy Spark o nazwie startDate
i użyto tej wartości w zapytaniu:
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Aby określić wiele wartości konfiguracji, użyj oddzielnej SET
instrukcji dla każdej wartości.
Przykład: Definiowanie filtru wierszy i maski kolumn
Ważne
Filtry wierszy i maski kolumn są w publicznej wersji zapoznawczej.
Aby utworzyć zmaterializowany widok lub tabelę przesyłania strumieniowego z filtrem wierszy i maską kolumn, użyj klauzuli ROW FILTER i klauzuli MASK. W poniższym przykładzie pokazano, jak zdefiniować zmaterializowany widok i tabelę przesyłania strumieniowego z filtrem wierszy i maską kolumn:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze
Aby uzyskać więcej informacji na temat filtrów wierszy i masek kolumn, zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.
Właściwości sql
Uwaga
Aby użyć klauzuli CLUSTER BY
w celu włączenia klastrowania liquid, potok musi być skonfigurowany do korzystania z kanału w wersji zapoznawczej.
TWORZENIE TABELI LUB WIDOKU |
---|
TEMPORARY Utwórz tabelę, ale nie publikuj metadanych dla tabeli. Klauzula TEMPORARY instruuje tabele delta Live Tables, aby utworzyć tabelę dostępną dla potoku, ale nie należy uzyskiwać dostępu poza potokiem. Aby skrócić czas przetwarzania, tabela tymczasowa utrzymuje się przez okres istnienia potoku, który go tworzy, a nie tylko pojedynczą aktualizację. |
STREAMING Utwórz tabelę, która odczytuje wejściowy zestaw danych jako strumień. Wejściowy zestaw danych musi być źródłem danych przesyłania strumieniowego, na przykład modułem automatycznego ładowania lub tabelą STREAMING . |
CLUSTER BY Włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania. Zobacz Użyj płynnego klastrowania dla tabel typu Delta). |
PARTITIONED BY Opcjonalna lista co najmniej jednej kolumny używanej do partycjonowania tabeli. |
LOCATION Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, system będzie domyślnie ustawiony na lokalizację przechowywania potoku. |
COMMENT Opcjonalny opis tabeli. |
column_constraint Opcjonalne ograniczenie klucza podstawowego lub klucza obcego w kolumnie. |
MASK clause (Publiczna wersja zapoznawcza)Dodaje funkcję maski kolumn do anonimowości poufnych danych. Przyszłe zapytania dla tej kolumny zwracają wynik ocenianej funkcji zamiast oryginalnej wartości kolumny. Jest to przydatne w przypadku szczegółowej kontroli dostępu, ponieważ funkcja może sprawdzić tożsamość użytkownika i członkostwo w grupach, aby zdecydować, czy zredagować wartość. Zobacz Klauzula maski kolumny. |
table_constraint Opcjonalne ograniczenie klucza podstawowego lub klucza obcego w tabeli. |
TBLPROPERTIES Opcjonalna lista właściwości tabeli. |
WITH ROW FILTER clause (Publiczna wersja zapoznawcza)Dodaje do tabeli funkcję filtru wierszy. Przyszłe zapytania dotyczące tej tabeli otrzymują podzbiór wierszy, dla których funkcja daje wartość TRUE. Jest to przydatne w przypadku szczegółowej kontroli dostępu, ponieważ umożliwia funkcji inspekcję tożsamości i członkostwa w grupach użytkownika wywołującego, aby zdecydować, czy filtrować niektóre wiersze. Zobacz klauzulę FILTR WIERSZA. |
select_statement Zapytanie Delta Live Tables, które definiuje zestaw danych dla tabeli. |
CONSTRAINT, klauzula |
---|
EXPECT expectation_name Zdefiniuj ograniczenie expectation_name dotyczące jakości danych. ON VIOLATION Jeśli ograniczenie nie jest zdefiniowane, dodaj wiersze naruszające ograniczenie do docelowego zestawu danych. |
ON VIOLATION Opcjonalna akcja do wykonania dla wierszy, które zakończyły się niepowodzeniem: - FAIL UPDATE : Natychmiastowe zatrzymywanie wykonywania potoku.- DROP ROW : Upuść rekord i kontynuuj przetwarzanie. |
Zmienianie przechwytywania danych za pomocą bazy danych SQL w tabelach delta live
Użyj instrukcji APPLY CHANGES INTO
, aby użyć funkcji CDC tabel delta live, zgodnie z opisem w następujących artykułach:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Definiujesz ograniczenia dotyczące jakości danych dla APPLY CHANGES
obiektu docelowego przy użyciu tej samej CONSTRAINT
klauzuli co zapytania inneAPPLY CHANGES
niż zapytania. Zobacz Zarządzanie jakością danych za pomocą tabel delta live.
Uwaga
Domyślne zachowanie dla INSERT
zdarzeń i UPDATE
dotyczy upsert zdarzeń CDC ze źródła: zaktualizować wszystkie wiersze w tabeli docelowej, które są zgodne z określonymi kluczami lub wstawić nowy wiersz, gdy pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE
można określić za pomocą APPLY AS DELETE WHEN
warunku.
Ważne
Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu APPLY CHANGES
tabeli docelowej należy również uwzględnić __START_AT
kolumny i __END_AT
o tym samym typie sequence_by
danych co pole.
Zobacz Interfejsy API ZASTOSUJ ZMIANY: upraszczanie przechwytywania danych zmian za pomocą tabel różnicowych na żywo.
Klauzule |
---|
KEYS Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej. Aby zdefiniować kombinację kolumn, użyj rozdzielanej przecinkami listy kolumn. Ta klauzula jest wymagana. |
IGNORE NULL UPDATES Zezwalaj na pozyskiwanie aktualizacji zawierających podzestaw kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i zostanie określone ignoruj aktualizacje o wartości NULL, kolumny z wartością null zachowają istniejące wartości w obiekcie docelowym. Dotyczy to również zagnieżdżonych kolumn z wartością null .Ta klauzula jest opcjonalna. Wartością domyślną jest zastąpienie istniejących kolumn wartościami null . |
APPLY AS DELETE WHEN Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE a nie upsert. Aby obsłużyć dane poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako grób w bazowej tabeli delty, a widok jest tworzony w magazynie metadanych, który filtruje te grobowce. Interwał przechowywania można skonfigurować za pomocą poleceniapipelines.cdc.tombstoneGCThresholdInSeconds właściwość tabeli.Ta klauzula jest opcjonalna. |
APPLY AS TRUNCATE WHEN Określa, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela TRUNCATE . Ponieważ ta klauzula wyzwala pełny obcięcie tabeli docelowej, powinna być używana tylko w określonych przypadkach użycia wymagających tej funkcji.Klauzula jest obsługiwana APPLY AS TRUNCATE WHEN tylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji obcinania.Ta klauzula jest opcjonalna. |
SEQUENCE BY Nazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Funkcja Delta Live Tables używa tej sekwencjonowania w celu obsługi zdarzeń zmiany, które docierają poza kolejność. Określona kolumna musi być sortowalnym typem danych. Ta klauzula jest wymagana. |
COLUMNS Określa podzbiór kolumn do uwzględnienia w tabeli docelowej. Można: - Określ pełną listę kolumn do uwzględnienia: COLUMNS (userId, name, city) .- Określ listę kolumn do wykluczenia: COLUMNS * EXCEPT (operation, sequenceNum) Ta klauzula jest opcjonalna. Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy klauzula nie zostanie określona COLUMNS . |
STORED AS Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2. Ta klauzula jest opcjonalna. Wartość domyślna to SCD typ 1. |
TRACK HISTORY ON Określa podzbiór kolumn wyjściowych do generowania rekordów historii, gdy istnieją jakiekolwiek zmiany w tych określonych kolumnach. Można: - Określ pełną listę kolumn do śledzenia: COLUMNS (userId, name, city) .- Określ listę kolumn, które mają być wykluczone ze śledzenia: COLUMNS * EXCEPT (operation, sequenceNum) Ta klauzula jest opcjonalna. Wartością domyślną jest śledzenie historii dla wszystkich kolumn wyjściowych, gdy istnieją jakiekolwiek zmiany, równoważne . TRACK HISTORY ON * |