Поделиться через


Разработка кода конвейера с помощью SQL

DLT представляет несколько новых ключевых слов и функций SQL для определения материализованных представлений и потоковых таблиц в конвейерах. Поддержка SQL для разработки конвейеров основана на основах Spark SQL и добавляет поддержку функций структурированной потоковой передачи.

Пользователи, знакомые с PySpark DataFrames, могут предпочесть разработку кода конвейера с помощью Python. Python поддерживает более обширное тестирование и операции, которые сложно реализовать с помощью SQL, такие как операции метапрограммирования. См. раздел Разработка кода конвейера с помощью Python.

Полный справочник по синтаксису DLT SQL см. в справочнике по языку DLT SQL.

Основы SQL для разработки конвейеров

Код SQL, создающий наборы данных DLT, использует синтаксис CREATE OR REFRESH для определения материализованных представлений и потоковых таблиц в результатах запроса.

Ключевое слово STREAM указывает, должен ли источник данных, на который ссылается предложение SELECT, считываться с семантикой потоковой передачи.

Считывает и записывает данные по умолчанию в каталог и схему, указанную во время настройки конвейера. См. Установка целевого каталога и схемы.

Исходный код DLT критически отличается от скриптов SQL: DLT оценивает все определения набора данных во всех файлах исходного кода, настроенных в конвейере, и создает граф потока данных перед выполнением любых запросов. Порядок запросов, отображаемых в записной книжке или скрипте, определяет порядок вычисления кода, но не порядок выполнения запроса.

Создание материализованного представления с помощью SQL

В следующем примере кода показан базовый синтаксис для создания материализованного представления с помощью SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Создание таблицы потоковой передачи с помощью SQL

В следующем примере кода показан базовый синтаксис для создания таблицы потоковой передачи с помощью SQL:

Заметка

Не все источники данных поддерживают потоковое чтение, а некоторые источники данных всегда должны обрабатываться с семантикой потоковой передачи.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Загрузка данных из хранилища объектов

DLT поддерживает загрузку данных из всех форматов, поддерживаемых Azure Databricks. См. параметры формата данных .

Заметка

В этих примерах используются данные, доступные на /databricks-datasets, автоматически подключённые к вашей рабочей области. Databricks рекомендует использовать пути тома или облачные URI для ссылки на данные, хранящиеся в облачном хранилище объектов. См. статью Что такое тома каталога Unity?.

Databricks рекомендует использовать автозагрузчик и потоковые таблицы при настройке добавочных рабочих нагрузок приема данных, хранящихся в облачном хранилище объектов. См. Что такое автозагрузчик?.

SQL использует функцию read_files для вызова функций автозагрузчика. Необходимо также использовать ключевое слово STREAM для настройки потокового чтения с помощью read_files.

В следующем примере создается потоковая таблица из JSON-файлов с помощью автозагрузчика:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Функция read_files также поддерживает пакетную семантику для создания материализованных представлений. В следующем примере используется пакетная семантика для чтения каталога JSON и создания материализованного представления:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Проверка данных с ожиданиями

Вы можете использовать ожидания для установки и применения ограничений качества данных. См. Управление качеством данных с ожиданиями конвейера.

Следующий код определяет ожидание с именем valid_data, которое удаляет записи, которые являются null во время приема данных:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Выполните запрос к материализованным представлениям и потоковым таблицам, которые определены в вашем конвейере

В следующем примере определяются четыре набора данных:

  • Потоковая таблица с именем orders, которая загружает данные JSON.
  • Материализованное представление с именем customers, которое загружает данные CSV.
  • Материализованное представление с именем customer_orders, которое объединяет записи из наборов данных orders и customers, преобразует метку времени заказа в дату и выбирает поля customer_id, order_number, stateи order_date.
  • Материализованное представление под именем daily_orders_by_state, которое агрегирует ежедневное количество заказов для каждого штата.

Заметка

При запросе представлений или таблиц в конвейере можно указать каталог и схему напрямую или использовать значения по умолчанию, настроенные в конвейере. В этом примере таблицы orders, customersи customer_orders записываются и считываются из каталога по умолчанию и схемы, настроенной для конвейера.

Устаревший режим публикации использует схему LIVE для запроса других материализованных представлений и потоковых таблиц, определенных в конвейере. В новых конвейерах синтаксис схемы LIVE автоматически игнорируется. См. схему LIVE (устаревшую версию).

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;