Разработка кода конвейера с помощью SQL
Delta Live Table представляет несколько новых ключевых слов и функций SQL для определения материализованных представлений и потоковых таблиц в конвейерах. Поддержка SQL для разработки конвейеров основана на основах Spark SQL и добавляет поддержку функций структурированной потоковой передачи.
Пользователи, знакомые с PySpark DataFrames, могут предпочесть разработку кода конвейера с помощью Python. Python поддерживает более обширное тестирование и операции, которые сложно реализовать с помощью SQL, такие как операции метапрограммирования. См. статью "Разработка кода конвейера с помощью Python".
Полный справочник по синтаксису SQL Delta Live Table см. в справочнике по языку SQL Delta Live Table.
Основы SQL для разработки конвейеров
Код SQL, создающий наборы данных Delta Live Table, использует CREATE OR REFRESH
синтаксис для определения материализованных представлений и потоковых таблиц в результатах запроса.
Ключевое STREAM
слово указывает, должен ли источник данных, на который ссылается предложение SELECT
, считываться с семантикой потоковой передачи.
Исходный код Разностных динамических таблиц критически отличается от скриптов SQL: Delta Live Tables оценивает все определения набора данных во всех файлах исходного кода, настроенных в конвейере, и создает граф потока данных перед выполнением любых запросов. Порядок запросов, отображаемых в записной книжке или скрипте, не определяет порядок выполнения.
Создание материализованного представления с помощью SQL
В следующем примере кода показан базовый синтаксис для создания материализованного представления с помощью SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Создание таблицы потоковой передачи с помощью SQL
В следующем примере кода показан базовый синтаксис для создания таблицы потоковой передачи с помощью SQL:
Примечание.
Не все источники данных поддерживают потоковое чтение, а некоторые источники данных всегда должны обрабатываться с семантикой потоковой передачи.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Загрузка данных из хранилища объектов
Delta Live Tables поддерживает загрузку данных из всех форматов, поддерживаемых Azure Databricks. См . параметры формата данных.
Примечание.
В этих примерах используются данные, доступные /databricks-datasets
в автоматически подключенной к рабочей области. Databricks рекомендует использовать пути тома или облачные URI для ссылки на данные, хранящиеся в облачном хранилище объектов. См. раздел "Что такое тома каталога Unity?".
Databricks рекомендует использовать автозагрузчик и потоковые таблицы при настройке добавочных рабочих нагрузок приема данных, хранящихся в облачном хранилище объектов. См. статью об автозагрузчике.
SQL использует функцию read_files
для вызова функции автозагрузчика. Необходимо также использовать ключевое STREAM
слово для настройки потокового чтения с read_files
помощью .
В следующем примере создается потоковая таблица из JSON-файлов с помощью автозагрузчика:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Функция read_files
также поддерживает пакетную семантику для создания материализованных представлений. В следующем примере используется пакетная семантика для чтения каталога JSON и создания материализованного представления:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Проверка данных с ожидаемыми ожиданиями
Вы можете использовать ожидания для установки и применения ограничений качества данных. См. Управление качеством данных с помощью ожиданий конвейера.
Следующий код определяет ожидание, которое valid_data
удаляет записи, которые являются null во время приема данных:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Запрос материализованных представлений и таблиц потоковой передачи, определенных в конвейере
Используйте схему LIVE
для запроса других материализованных представлений и таблиц потоковой передачи, определенных в конвейере.
В следующем примере определяются четыре набора данных:
- Потоковая таблица с именем
orders
, которая загружает данные JSON. - Материализованное представление с именем
customers
, которое загружает данные CSV. - Материализованное представление с именем
customer_orders
, которое объединяет записи изorders
наборов данных иcustomers
присваиваетcustomer_id
метку времени заказа дате и выбирает поля ,order_number
state
а также поля .order_date
- Материализованное представление с именем
daily_orders_by_state
, которое объединяет ежедневное количество заказов для каждого состояния.
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;