Поделиться через


Справочник по языку SQL Delta Live Tables

В этой статье содержатся сведения о интерфейсе программирования Delta Live Tables SQL.

  • Дополнительные сведения об API Python см. всправочнике по языку Python Delta Live.
  • Дополнительные сведения о командах SQL см . в справочнике по языку SQL.

В запросах SQL можно использовать определяемые пользователем функции Python, но перед их вызовом в исходных файлах SQL необходимо определить эти определяемые пользователем функции Python. См . раздел "Определяемые пользователем скалярные функции " Python".

Ограничения

Предложение PIVOT не поддерживается. Для выполнения операции pivot в Spark требуется предварительная загрузка всех входных данных для вычисления выходных schema. Эта возможность не поддерживается в Delta Live Tables.

создание материализованного представления или потоковой передачи Delta Live Tablestable

Примечание.

Синтаксис CREATE OR REFRESH LIVE TABLE для создания материализованного представления не рекомендуется. Используйте CREATE OR REFRESH MATERIALIZED VIEW.

При объявлении потоковой table или материализованного представления используется одинаковый базовый синтаксис SQL.

Объявите материализованное представление Delta Live Tables с помощью SQL

Ниже описан синтаксис объявления материализованного представления в Delta Live Tables с помощью SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Объявление потоковой передачи table Delta Tables Live с помощью SQL

Вы можете объявить поток tables только с помощью запросов, выполняемых против источника потоковой передачи. Databricks рекомендует использовать автозагрузчик для приема файлов из облачного хранилища объектов. См . синтаксис SQL автозагрузчика.

При указании других tables или views в конвейере в качестве источников потоковой передачи необходимо включить функцию STREAM() вокруг имени набора данных.

Ниже описан синтаксис объявления потоковой table в Delta Live Tables с помощью SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Создание представления Delta Live Tables

Ниже описан синтаксис для объявления views с помощью SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Синтаксис автоматического загрузчика SQL

Ниже описан синтаксис для работы с Автозагрузчиком в SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

С Автозагрузчиком можно использовать поддерживаемые параметры формата. map() С помощью функции можно передать параметры методуread_files(). Параметры — это пары "ключ-значение", where ключи и values являются строками. Дополнительные сведения о форматах и параметрах поддержки см. в параметрах формата файлов.

Пример. Определение tables

Набор данных можно создать путем считывания из внешнего источника данных или наборов данных, определенных в конвейере. Для чтения из внутреннего набора данных, добавьте ключевое слово LIVE в начало имени набора данных: В следующем примере определяются два разных набора данных: table с именем taxi_raw, который принимает JSON-файл в качестве источника входных данных и table с именем filtered_data, который принимает taxi_rawtable в качестве входных данных:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Пример. Чтение из источника потоковой передачи

Чтобы считывать данные из источника потоковой передачи, например Auto Loader или внутренний набор данных, определите STREAMINGtable:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Дополнительные сведения о потоковой передаче данных см. в разделе Преобразование данных с помощью Delta Live Tables.

Контролируйте, как tables материализуются

Tables также предлагают дополнительный контроль над их материализацией:

  • Укажите, как tablesразделены с помощью PARTITIONED BY. Секционирование можно использовать для ускорения запросов.
  • Свойства можно settable с помощью TBLPROPERTIES. См. свойства Delta Live Tablestable.
  • Set расположение хранилища с помощью параметра LOCATION. По умолчанию данные table хранятся в месте хранения конвейера, если LOCATION не set.
  • Вы можете использовать созданные columns в вашем определении schema. См. пример: укажите schema и partition,columns,.

Примечание.

Для tables менее 1 ТБ по размеру Databricks рекомендует разрешить Delta Live Tables управлять организацией данных. Если вы не ожидаете, что table будет расти за пределами терабайта, Databricks рекомендует не указывать partitioncolumns.

Пример : Укажите schema и partitioncolumns

Вы можете дополнительно указать schema при определении table. В следующем примере указывается schema для целевого table, включая использование Delta Lake созданных columns и определения partitioncolumns для table:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

По умолчанию Delta Live Tables выводит schema из определения table, если не указать schema.

пример . Определение ограничений table

Примечание.

Поддержка Tables Delta Live для ограничений table находится в общедоступной предварительной версии. Чтобы определить ограничения table, конвейер должен быть конвейером Catalogс поддержкой Unity и настроен для использования канала preview.

При указании schemaможно определить первичные и внешние ключи. Ограничения являются информационными и не применяются. См. предложение CONSTRAINT в справочнике по языку SQL.

В следующем примере table определяется с помощью первичного и внешнего ключа constraint:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Параметризация values, используемая при объявлении tables или views с помощью SQL

Используйте SET, чтобы указать значение конфигурации в запросе, объявляющем table или представление, включая конфигурации Spark. Любое table или представление, определенное в ноутбуке после инструкции SET, имеет доступ к уже определённому значению. Все конфигурации Spark, указанные с помощью инструкции SET, используются при выполнении запроса Spark для любого table или представления после инструкции SET. Чтобы считать значение конфигурации в запросе, используйте синтаксис интерполяции строк ${}. В следующем примере задается значение конфигурации Spark с именем startDate, которое используется в запросе:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Чтобы указать несколько конфигураций values, используйте отдельный оператор SET для каждого значения.

пример. Определение фильтра строк и маски column

Внимание

Фильтры строк и маски column находятся в общедоступной предварительной версии.

Чтобы создать материализованное представление или потоковую table с фильтром строк и маской column, используйте предложение ROW FILTER и предложение MASK. В следующем примере показано, как определить материализованное представление и потоковой table с фильтром строк и маской column.

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze

Дополнительные сведения о фильтрах строк и масках column см. в разделе публикация tables с фильтрами строк и column масками.

Свойства SQL

CREATE TABLE или VIEW
TEMPORARY

Создайте table, но не публикуйте метаданные для table. Предложение TEMPORARY указывает Delta Live Tables создать table, доступ к которому доступен конвейеру, но не должен быть доступен за пределами конвейера. Чтобы сократить время обработки, временный table сохраняется в течение всего времени существования конвейера, который создает его, а не только один update.
STREAMING

Создайте элемент table, который считывает входной набор данных в виде потока. Входной набор данных должен быть источником потоковых данных, например, Auto Loader или STREAMINGtable.
CLUSTER BY

Включите кластеризацию жидкости в table и определите columns для использования в качестве ключей кластеризации.

См. раздел Использование кластеризации жидкости для Delta tables.
PARTITIONED BY

Необязательный list, который может включать один или несколько columns для разделения table.
LOCATION

Необязательное место хранения для данных table. Если это не set, система по умолчанию выберет местоположение хранилища конвейера.
COMMENT

Необязательное описание для table.
column_constraint

Необязательный информационный первичный ключ или внешний ключ constraint для column.
MASK clause (общедоступная предварительная версия)

Добавляет функцию маски column для анонимизации конфиденциальных данных. Будущие запросы для этого column будут возвращать результат вычисленной функции вместо исходного значения column. Это полезно для точного управления доступом, так как функция может проверить удостоверение пользователя и членство в группах, чтобы решить, следует ли изменить значение.

См. маски Column согласно пункту.
table_constraint

Необязательный информационный первичный ключ или внешний ключ constraint на table.
TBLPROPERTIES

Необязательный list свойств table для table.
WITH ROW FILTER clause (общедоступная предварительная версия)

Добавляет функцию фильтра строк в table. Будущие запросы для этого table получают подмножество строк, для которых функция возвращает значение TRUE. Это полезно для точного управления доступом, так как она позволяет функции проверять удостоверения и членства в группах вызывающего пользователя, чтобы решить, следует ли фильтровать определенные строки.

См. пункт.
select_statement

Запрос Delta Live Tables, определяющий набор данных для table.
предложение CONSTRAINT
EXPECT expectation_name

Качество данных — constraintexpectation_name. Если ON VIOLATIONconstraint не определен, добавьте строки, которые нарушают constraint, в целевой набор данных.
ON VIOLATION

Необязательное действие для неудачных строк:

- FAIL UPDATE: Немедленно останавливает выполнение конвейера.
- DROP ROW: Удаление записи и продолжение обработки.

Отслеживание изменений данных с помощью SQL в Delta Live Tables

Используйте инструкцию APPLY CHANGES INTO для использования функции Delta Live Tables CDC, как описано в следующем разделе:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Вы определяете ограничения качества данных для целевого APPLY CHANGES объекта, используя то же CONSTRAINT предложение, что и запросы, отличныеAPPLY CHANGES от запросов. См. Управление качеством данных с ожиданиями конвейера.

Примечание.

Поведение по умолчанию для событий INSERT и UPDATE заключается в событиях upsert CDC из источника: update все строки в целевом table, соответствующие указанным ключам или insert новую строку, если соответствующая запись не существует в целевой table. Способ обработки событий DELETE можно указать с помощью условия APPLY AS DELETE WHEN.

Внимание

Для внесения изменений необходимо указать целевой поток table. При необходимости можно указать schema для tableцели. При указании schema целевого tableAPPLY CHANGES необходимо также включить __START_AT и __END_ATcolumns с тем же типом данных, что и поле sequence_by.

См. Как интерфейсы APPLY CHANGES упрощают отслеживание изменений данных с помощью Delta Live Tables.

Предложения
KEYS

column или комбинация columns, однозначно определяющая строку в исходных данных. Это используется для определения, какие события CDC применяются к конкретным записям в целевом объекте table.

Чтобы определить сочетание columns, используйте разделенные запятыми listcolumns.

Это предложение обязательно.
IGNORE NULL UPDATES

Разрешить принимать обновления, содержащие подмножество целевого columns. При совпадении события CDC с существующей строкой и указан параметр IGNORE NULL UPDATES, columns и null сохранят свои существующие values в целевом элементе. Это также относится к вложенным columns со значением null.

Предложение не является обязательным.

По умолчанию существующие columns перезаписываются на nullvalues.
APPLY AS DELETE WHEN

Указывает, в каких случаях событие CDC необходимо обрабатывать как DELETE, а не как upsert. Чтобы обрабатывать данные вне порядка, удаленная строка временно сохраняется в качестве могилы в базовом разностном table, а представление создается в хранилище метаданных, которое фильтрует эти могилы. Интервал хранения можно настроить с помощью свойства таблицы
свойство pipelines.cdc.tombstoneGCThresholdInSecondstable.

Предложение не является обязательным.
APPLY AS TRUNCATE WHEN

Указывает, когда событие CDC должно рассматриваться как полное tableTRUNCATE. Поскольку этот пункт активирует полное усечение целевого table, его следует использовать только для конкретных случаев, требующих этой возможности.

Предложение APPLY AS TRUNCATE WHEN поддерживается только для SCD типа 1. ScD типа 2 не поддерживает операцию усечения.

Предложение не является обязательным.
SEQUENCE BY

Имя column, указывающее логический порядок событий CDC в исходных данных. Delta Live Tables использует эту последовательность для обработки событий изменений, приходящих в неправильном порядке.

Указанный column должен быть сортируемым типом данных.

Это предложение обязательно.
COLUMNS

Указывает подмножество columns для включения в целевой table. Вы можете сделать одно из двух:

— Укажите полный list для columns, чтобы включить: COLUMNS (userId, name, city).
— укажите значение list для columns, чтобы исключить: COLUMNS * EXCEPT (operation, sequenceNum)

Предложение не является обязательным.

По умолчанию необходимо включить все columns в целевой table, если предложение COLUMNS не указано.
STORED AS

Определяет, следует ли хранить записи в виде SCD типа 1 или SCD типа 2.

Предложение не является обязательным.

Значение по умолчанию — SCD типа 1.
TRACK HISTORY ON

Задает подмножество записей журнала истории от columns до generate, если есть изменения в указанных columns. Вы можете сделать одно из двух:

— Укажите полный номер list и columns для отслеживания: COLUMNS (userId, name, city).
— укажите list из columns, чтобы исключить из отслеживания: COLUMNS * EXCEPT (operation, sequenceNum)

Предложение не является обязательным. Значение по умолчанию — отслеживать историю всех выходных columns при наличии изменений, что эквивалентно TRACK HISTORY ON *.