Разработка кода конвейера с помощью SQL
DLT представляет несколько новых ключевых слов и функций SQL для определения материализованных представлений и потоковых таблиц в конвейерах. Поддержка SQL для разработки конвейеров основана на основах Spark SQL и добавляет поддержку функций структурированной потоковой передачи.
Пользователи, знакомые с PySpark DataFrames, могут предпочесть разработку кода конвейера с помощью Python. Python поддерживает более обширное тестирование и операции, которые сложно реализовать с помощью SQL, такие как операции метапрограммирования. См. раздел Разработка кода конвейера с помощью Python.
Полный справочник по синтаксису DLT SQL см. в справочнике по языку DLT SQL.
Основы SQL для разработки конвейеров
Код SQL, создающий наборы данных DLT, использует синтаксис CREATE OR REFRESH
для определения материализованных представлений и потоковых таблиц в результатах запроса.
Ключевое слово STREAM
указывает, должен ли источник данных, на который ссылается предложение SELECT
, считываться с семантикой потоковой передачи.
Считывает и записывает данные по умолчанию в каталог и схему, указанную во время настройки конвейера. См. Установка целевого каталога и схемы.
Исходный код DLT критически отличается от скриптов SQL: DLT оценивает все определения набора данных во всех файлах исходного кода, настроенных в конвейере, и создает граф потока данных перед выполнением любых запросов. Порядок запросов, отображаемых в записной книжке или скрипте, определяет порядок вычисления кода, но не порядок выполнения запроса.
Создание материализованного представления с помощью SQL
В следующем примере кода показан базовый синтаксис для создания материализованного представления с помощью SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Создание таблицы потоковой передачи с помощью SQL
В следующем примере кода показан базовый синтаксис для создания таблицы потоковой передачи с помощью SQL:
Заметка
Не все источники данных поддерживают потоковое чтение, а некоторые источники данных всегда должны обрабатываться с семантикой потоковой передачи.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Загрузка данных из хранилища объектов
DLT поддерживает загрузку данных из всех форматов, поддерживаемых Azure Databricks. См. параметры формата данных .
Заметка
В этих примерах используются данные, доступные на /databricks-datasets
, автоматически подключённые к вашей рабочей области. Databricks рекомендует использовать пути тома или облачные URI для ссылки на данные, хранящиеся в облачном хранилище объектов. См. статью Что такое тома каталога Unity?.
Databricks рекомендует использовать автозагрузчик и потоковые таблицы при настройке добавочных рабочих нагрузок приема данных, хранящихся в облачном хранилище объектов. См. Что такое автозагрузчик?.
SQL использует функцию read_files
для вызова функций автозагрузчика. Необходимо также использовать ключевое слово STREAM
для настройки потокового чтения с помощью read_files
.
В следующем примере создается потоковая таблица из JSON-файлов с помощью автозагрузчика:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Функция read_files
также поддерживает пакетную семантику для создания материализованных представлений. В следующем примере используется пакетная семантика для чтения каталога JSON и создания материализованного представления:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Проверка данных с ожиданиями
Вы можете использовать ожидания для установки и применения ограничений качества данных. См. Управление качеством данных с ожиданиями конвейера.
Следующий код определяет ожидание с именем valid_data
, которое удаляет записи, которые являются null во время приема данных:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Выполните запрос к материализованным представлениям и потоковым таблицам, которые определены в вашем конвейере
В следующем примере определяются четыре набора данных:
- Потоковая таблица с именем
orders
, которая загружает данные JSON. - Материализованное представление с именем
customers
, которое загружает данные CSV. - Материализованное представление с именем
customer_orders
, которое объединяет записи из наборов данныхorders
иcustomers
, преобразует метку времени заказа в дату и выбирает поляcustomer_id
,order_number
,state
иorder_date
. - Материализованное представление под именем
daily_orders_by_state
, которое агрегирует ежедневное количество заказов для каждого штата.
Заметка
При запросе представлений или таблиц в конвейере можно указать каталог и схему напрямую или использовать значения по умолчанию, настроенные в конвейере. В этом примере таблицы orders
, customers
и customer_orders
записываются и считываются из каталога по умолчанию и схемы, настроенной для конвейера.
Устаревший режим публикации использует схему LIVE
для запроса других материализованных представлений и потоковых таблиц, определенных в конвейере. В новых конвейерах синтаксис схемы LIVE
автоматически игнорируется. См. схему LIVE (устаревшую версию).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;