Udostępnij za pośrednictwem


Dokumentacja języka SQL DLT

Ten artykuł zawiera szczegółowe informacje dotyczące interfejsu programowania DLT SQL.

  • Aby uzyskać informacje na temat interfejsu API języka Python, zobacz dokumentację języka Python DLT.
  • Aby uzyskać więcej informacji na temat poleceń SQL, zobacz odniesienie do języka SQL.

W zapytaniach SQL można używać funkcji zdefiniowanych przez użytkownika (UDF) w Pythonie, ale przed ich wywołaniem w plikach źródłowych SQL, należy je zdefiniować w plikach języka Python. Zobacz Funkcje skalarne zdefiniowane przez użytkownika — Python.

Ograniczenia

Klauzula PIVOT nie jest obsługiwana. Operacja pivot na platformie Spark wymaga załadowania danych wejściowych, aby obliczyć schemat wyjściowy. Ta funkcja nie jest obsługiwana w technologii DLT.

Utwórz zmaterializowany widok DLT lub tabelę przesyłania strumieniowego

Notatka

Składnia CREATE OR REFRESH LIVE TABLE umożliwiająca utworzenie zmaterializowanego widoku jest przestarzała. Zamiast tego użyj CREATE OR REFRESH MATERIALIZED VIEW.

Używasz tej samej podstawowej składni SQL przy deklarowaniu tabeli przesyłania strumieniowego lub widoku zmaterializowanego.

Deklarowanie zmaterializowanego widoku DLT za pomocą języka SQL

Poniżej opisano składnię deklarowania zmaterializowanego widoku w technologii DLT za pomocą języka SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  CLUSTER BY clause
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Deklarowanie tabeli przesyłania strumieniowego DLT przy użyciu języka SQL

Tabele przesyłania strumieniowego można zadeklarować tylko przy użyciu zapytań odczytywanych względem źródła przesyłania strumieniowego. Usługa Databricks zaleca używanie automatycznego modułu ładującego do pozyskiwania plików przesyłanych strumieniowo z magazynu obiektów w chmurze. Zobacz składnię SQL automatycznego modułu ładującego.

Podczas określania innych tabel lub widoków w potoku jako źródeł przesyłania strumieniowego należy użyć funkcji STREAM() przy nazwie zestawu danych.

Poniżej opisano składnię deklarowania tabeli przesyłania strumieniowego w dlT za pomocą języka SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [CLUSTER BY clause]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Tworzenie widoku DLT

Poniżej opisano składnię deklarowania widoków za pomocą języka SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

składnia SQL automatycznego modułu ładującego

Poniżej opisano składnię pracy z modułem automatycznego ładowania w programie SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value>",
      "<option-key>", "<option_value>",
      ...
    )
  )

Możesz użyć obsługiwanych opcji formatowania z Auto Loaderem. Za pomocą funkcji map() można przekazać opcje do metody read_files(). Opcje to pary klucz-wartość, w których klucze i wartości są ciągami. Aby uzyskać szczegółowe informacje na temat formatów i opcji obsługi, zobacz Opcje formatu pliku.

Przykład: Definiowanie tabel

Możesz utworzyć zestaw danych, korzystając z zewnętrznego źródła danych lub z zestawów danych zdefiniowanych w przepływie danych. Aby odczytać z wewnętrznego zestawu danych, określ nazwę tabeli, która będzie używać skonfigurowanych wartości domyślnych potoku dla wykazu i schematu. W poniższym przykładzie zdefiniowano dwa różne zestawy danych: tabelę o nazwie taxi_raw, która przyjmuje plik JSON jako źródło wejściowe i tabelę o nazwie filtered_data, która przyjmuje tabelę taxi_raw jako dane wejściowe:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM taxi_raw

Przykład: odczyt ze źródła przesyłania strumieniowego

Aby odczytać dane ze źródła przesyłania strumieniowego, na przykład moduł automatycznego ładowania lub wewnętrzny zestaw danych, zdefiniuj tabelę STREAMING:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Aby uzyskać więcej informacji na temat danych przesyłanych strumieniowo, zobacz Przekształcanie danych za pomocą potoków.

Trwale usuń rekordy z zmaterializowanego widoku lub tabeli strumieniowej

Aby trwale usunąć rekordy z zmaterializowanego widoku lub tabeli przesyłania strumieniowego z włączonymi wektorami usuwania, na przykład w celu zgodności z RODO, należy wykonać dodatkowe operacje na podstawowych tabelach Delta obiektu. Aby zapewnić usunięcie rekordów z zmaterializowanego widoku, zobacz Trwałe usuwanie rekordów z zmaterializowanego widoku z włączonymi wektorami usuwania. Aby zapewnić usunięcie rekordów z tabeli przesyłania strumieniowego, zobacz Trwałe usuwanie rekordów z tabeli przesyłania strumieniowego.

Kontrolowanie sposobu materializacji tabel

Tabele oferują również dodatkową kontrolę nad ich materializacją:

Nota

W przypadku tabel o rozmiarze mniejszym niż 1 TB, usługa Databricks zaleca pozwolenie DLT na kontrolę organizacji danych. Jeśli nie spodziewasz się, że tabela przekroczy terabajt, usługa Databricks zaleca, aby nie określać kolumn partycji.

Przykład: określ schemat i kolumny klastra

Opcjonalnie można określić schemat podczas definiowania tabeli. Poniższy przykład określa schemat tabeli docelowej, uwzględniając użycie wygenerowanych kolumn Delta Lake , i definiuje kolumny klastrowania dla tabeli.

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Domyślnie biblioteka DLT wywnioskuje schemat z definicji table, jeśli nie określisz schematu.

Przykład: określanie kolumn partycji

Opcjonalnie możesz określić kolumny partycji dla tabeli:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Płynne klastrowanie zapewnia elastyczne, zoptymalizowane rozwiązanie do klastrowania. Rozważ użycie CLUSTER BY zamiast PARTITIONED BY dla DLT.

Przykład: Definiowanie ograniczeń tabeli

Notatka

Obsługa DLT dla ograniczeń tabeli znajduje się w publicznej wersji zapoznawczej. Aby zdefiniować ograniczenia tabeli, potok musi być potokiem z włączoną obsługą Unity Catalog i skonfigurowanym do używania kanału preview.

Podczas określania schematu można zdefiniować klucze podstawowe i obce. Ograniczenia są informacyjne i nie są wymuszane. Zobacz klauzulę CONSTRAINT w dokumentacji języka SQL.

W poniższym przykładzie zdefiniowano tabelę z ograniczeniem klucza podstawowego i obcego:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Parametryzowanie wartości używanych podczas deklarowania tabel lub widoków przy użyciu języka SQL

Użyj SET, aby określić wartość konfiguracji w zapytaniu, które deklaruje tabelę lub widok, w tym konfiguracje platformy Spark. Każda tabela lub widok zdefiniowany w notesie po instrukcji SET ma dostęp do zdefiniowanej wartości. Wszystkie konfiguracje platformy Spark określone przy użyciu instrukcji SET są używane podczas wykonywania zapytania Spark dla dowolnej tabeli lub widoku zgodnie z instrukcją SET. Aby odczytać wartość konfiguracji w zapytaniu, użyj składni interpolacji ciągów ${}. W poniższym przykładzie ustawiono wartość konfiguracji platformy Spark o nazwie startDate i użyto tej wartości w zapytaniu:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Aby określić wiele wartości konfiguracji, użyj oddzielnej instrukcji SET dla każdej wartości.

Przykład: Definiowanie filtru wiersza i maski kolumn

Ważny

Filtry wierszy i maski kolumn znajdują się w publicznej wersji zapoznawczej.

Aby utworzyć zmaterializowany widok lub tabelę przesyłania strumieniowego z filtrem wierszy i maską kolumn, użyj klauzuli ROW FILTER i klauzuli MASK. W poniższym przykładzie pokazano, jak zdefiniować zmaterializowany widok i tabelę przesyłania strumieniowego z filtrem wierszy i maską kolumn:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze

Aby uzyskać więcej informacji na temat filtrów wierszy i masek kolumn, zobacz Publikowanie tabel z filtrami wierszy i maskami kolumn.

Właściwości sql

CREATE TABLE lub WYŚWIETL
TEMPORARY

Utwórz tabelę, ale nie publikuj metadanych dla tabeli. Klauzula TEMPORARY instruuje bibliotekę DLT, aby utworzyć tabelę dostępną dla potoku, ale nie powinna być dostępna poza potokiem. Aby skrócić czas przetwarzania, tabela tymczasowa utrzymuje się przez cały okres istnienia potoku, który ją tworzy, a nie tylko podczas pojedynczej aktualizacji.
STREAMING

Utwórz tabelę, która odczytuje wejściowy zestaw danych jako strumień. Wejściowy zestaw danych musi być źródłem danych przesyłania strumieniowego, na przykład modułem automatycznego ładowania lub tabelą STREAMING.
CLUSTER BY

Włącz płynne klastrowanie w tabeli i zdefiniuj kolumny do użycia jako klucze klastrowania.

Zobacz Użyj grupowania cieczy dla tabel Delta.
PARTITIONED BY

Opcjonalna lista co najmniej jednej kolumny używanej do partycjonowania tabeli.
LOCATION

Opcjonalna lokalizacja przechowywania danych tabeli. Jeśli nie zostanie ustawiona, system będzie domyślnie ustawiony na lokalizację przechowywania potoku.
COMMENT

Opcjonalny opis tabeli.
column_constraint

Opcjonalne ograniczenie klucza podstawowego lub klucza obcego zastosowane w kolumnie.
MASK clause (publiczna wersja zapoznawcza)

Dodaje funkcję maski kolumn do anonimowości poufnych danych. Przyszłe zapytania dla tej kolumny zwracają wynik ocenianej funkcji zamiast oryginalnej wartości kolumny. Jest to przydatne w przypadku szczegółowej kontroli dostępu, ponieważ funkcja może sprawdzić tożsamość użytkownika i członkostwo w grupach, aby zdecydować, czy zredagować wartość.

Zobacz klauzulę mask kolumny .
table_constraint

Opcjonalne informacyjne ograniczenie klucza podstawowego lub klucza obcego w tabeli.
TBLPROPERTIES

Opcjonalna lista właściwości tabeli dla tabeli.
WITH ROW FILTER clause (publiczna wersja zapoznawcza)

Dodaje do tabeli funkcję filtru wierszy. Przyszłe zapytania dotyczące tej tabeli otrzymują podzbiór wierszy, dla których funkcja daje wartość TRUE. Jest to przydatne w przypadku szczegółowej kontroli dostępu, ponieważ umożliwia funkcji inspekcję tożsamości i członkostwa w grupach użytkownika wywołującego, aby zdecydować, czy filtrować niektóre wiersze.

Zobacz klauzulę ROW FILTER.
select_statement

Zapytanie DLT definiujące zestaw danych dla tabeli.
CONSTRAINT, klauzula
EXPECT expectation_name

Zdefiniuj ograniczenie jakości danych expectation_name. Jeśli ograniczenie ON VIOLATION nie jest zdefiniowane, dodaj wiersze naruszające ograniczenie do docelowego zestawu danych.
ON VIOLATION

Działanie opcjonalne w przypadku wierszy, które zakończyły się niepowodzeniem.

- FAIL UPDATE: natychmiastowe zatrzymywanie wykonywania potoku.
- DROP ROW: pomiń rekord i kontynuuj przetwarzanie.

zmienianie przechwytywania danych za pomocą języka SQL w technologii DLT

Użyj instrukcji APPLY CHANGES INTO, aby korzystać z funkcjonalności CDC DLT, zgodnie z opisem poniżej:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Definiujesz ograniczenia dotyczące jakości danych dla obiektu docelowego APPLY CHANGES przy użyciu tej samej klauzuli CONSTRAINT co zapytania inne niżAPPLY CHANGES. Zobacz Zarządzanie jakością danych za pomocą oczekiwań dotyczących potoku.

Notatka

Domyślnym zachowaniem zdarzeń INSERT i UPDATE jest upsert zdarzeń CDC ze źródła: zaktualizować wszystkie wiersze w tabeli docelowej, które pasują do określonych kluczy lub wstawić nowy wiersz, gdy pasujący rekord nie istnieje w tabeli docelowej. Obsługę zdarzeń DELETE można określić przy użyciu warunku APPLY AS DELETE WHEN.

Ważny

Aby zastosować zmiany, należy zadeklarować docelową tabelę przesyłania strumieniowego, do której chcesz zastosować zmiany. Opcjonalnie możesz określić schemat tabeli docelowej. Podczas określania schematu tabeli docelowej APPLY CHANGES należy również uwzględnić kolumny __START_AT i __END_AT o tym samym typie danych co pole sequence_by.

Zobacz interfejsy API "Zastosuj Zmiany": upraszczanie procesu przechwytywania zmian danych za pomocą biblioteki DLT.

Klauzule
KEYS

Kolumna lub kombinacja kolumn, które jednoznacznie identyfikują wiersz w danych źródłowych. Służy do identyfikowania, które zdarzenia CDC mają zastosowanie do określonych rekordów w tabeli docelowej.

Aby zdefiniować kombinację kolumn, użyj rozdzielanej przecinkami listy kolumn.

Ta klauzula jest wymagana.
IGNORE NULL UPDATES

Zezwalaj na pozyskiwanie aktualizacji zawierających podzestaw kolumn docelowych. Gdy zdarzenie CDC pasuje do istniejącego wiersza i zostanie określone jako IGNORUJ AKTUALIZACJE NULL, kolumny z null zachowają istniejące wartości w obszarze docelowym. Dotyczy to także kolumn zagnieżdżonych, które mają wartość null.

Ta klauzula jest opcjonalna.

Domyślnie istniejące kolumny są nadpisywane wartościami null.
APPLY AS DELETE WHEN

Określa, kiedy zdarzenie CDC powinno być traktowane jako DELETE, a nie upsert. Aby obsłużyć dane poza kolejnością, usunięty wiersz jest tymczasowo zachowywany jako znacznik usunięcia w bazowej tabeli Delta, a widok tworzony jest w magazynie metadanych, który filtruje te znaczniki usunięcia. Okres przechowywania można skonfigurować za pomocą tego polecenia
pipelines.cdc.tombstoneGCThresholdInSeconds właściwość tabeli.

Ta klauzula jest opcjonalna.
APPLY AS TRUNCATE WHEN

Określa, kiedy zdarzenie CDC powinno być traktowane jako pełna tabela TRUNCATE. Ponieważ ta klauzula wyzwala pełny obcięcie tabeli docelowej, powinna być używana tylko w określonych przypadkach użycia wymagających tej funkcji.

Klauzula APPLY AS TRUNCATE WHEN jest obsługiwana tylko dla typu SCD 1. Typ SCD 2 nie obsługuje operacji skracania.

Ta klauzula jest opcjonalna.
SEQUENCE BY

Nazwa kolumny określająca kolejność logiczną zdarzeń CDC w danych źródłowych. Biblioteka DLT używa tego sekwencjonowania do obsługi zdarzeń zmiany, które docierają poza kolejnością.

Określona kolumna musi być sortowalnym typem danych.

Ta klauzula jest wymagana.
COLUMNS

Określa podzbiór kolumn do uwzględnienia w tabeli docelowej. Możesz wykonać jedną z następujących czynności:

- Określ pełną listę kolumn do uwzględnienia: COLUMNS (userId, name, city).
- Określ listę kolumn do wykluczenia: COLUMNS * EXCEPT (operation, sequenceNum)

Ta klauzula jest opcjonalna.

Wartością domyślną jest dołączenie wszystkich kolumn do tabeli docelowej, gdy nie określono klauzuli COLUMNS.
STORED AS

Określa, czy rekordy mają być przechowywane jako typ SCD 1, czy SCD, 2.

Ta klauzula jest opcjonalna.

Wartość domyślna to SCD typ 1.
TRACK HISTORY ON

Określa podzbiór kolumn wyjściowych do generowania rekordów historii, gdy istnieją jakiekolwiek zmiany w tych określonych kolumnach. Możesz wykonać jedną z następujących czynności:

- Określ pełną listę kolumn do śledzenia: COLUMNS (userId, name, city).
- Określ listę kolumn, które mają być wykluczone ze śledzenia: COLUMNS * EXCEPT (operation, sequenceNum)

Ta klauzula jest opcjonalna. Wartością domyślną jest śledzenie historii wszystkich kolumn wyjściowych w przypadku jakichkolwiek zmian, co odpowiada TRACK HISTORY ON *.