Поделиться через


Разработка кода конвейера с помощью SQL

Delta Live Table представляет несколько новых ключевых слов и функций SQL для определения материализованных представлений и потоковых таблиц в конвейерах. Поддержка SQL для разработки конвейеров основана на основах Spark SQL и добавляет поддержку функций структурированной потоковой передачи.

Пользователи, знакомые с PySpark DataFrames, могут предпочесть разработку кода конвейера с помощью Python. Python поддерживает более обширное тестирование и операции, которые сложно реализовать с помощью SQL, такие как операции метапрограммирования. См. статью "Разработка кода конвейера с помощью Python".

Полный справочник по синтаксису SQL Delta Live Table см. в справочнике по языку SQL Delta Live Table.

Основы SQL для разработки конвейеров

Код SQL, создающий наборы данных Delta Live Table, использует CREATE OR REFRESH синтаксис для определения материализованных представлений и потоковых таблиц в результатах запроса.

Ключевое STREAM слово указывает, должен ли источник данных, на который ссылается предложение SELECT , считываться с семантикой потоковой передачи.

Исходный код Разностных динамических таблиц критически отличается от скриптов SQL: Delta Live Tables оценивает все определения набора данных во всех файлах исходного кода, настроенных в конвейере, и создает граф потока данных перед выполнением любых запросов. Порядок запросов, отображаемых в записной книжке или скрипте, не определяет порядок выполнения.

Создание материализованного представления с помощью SQL

В следующем примере кода показан базовый синтаксис для создания материализованного представления с помощью SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Создание таблицы потоковой передачи с помощью SQL

В следующем примере кода показан базовый синтаксис для создания таблицы потоковой передачи с помощью SQL:

Примечание.

Не все источники данных поддерживают потоковое чтение, а некоторые источники данных всегда должны обрабатываться с семантикой потоковой передачи.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Загрузка данных из хранилища объектов

Delta Live Tables поддерживает загрузку данных из всех форматов, поддерживаемых Azure Databricks. См . параметры формата данных.

Примечание.

В этих примерах используются данные, доступные /databricks-datasets в автоматически подключенной к рабочей области. Databricks рекомендует использовать пути тома или облачные URI для ссылки на данные, хранящиеся в облачном хранилище объектов. См. раздел "Что такое тома каталога Unity?".

Databricks рекомендует использовать автозагрузчик и потоковые таблицы при настройке добавочных рабочих нагрузок приема данных, хранящихся в облачном хранилище объектов. См. статью об автозагрузчике.

SQL использует функцию read_files для вызова функции автозагрузчика. Необходимо также использовать ключевое STREAM слово для настройки потокового чтения с read_filesпомощью .

В следующем примере создается потоковая таблица из JSON-файлов с помощью автозагрузчика:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Функция read_files также поддерживает пакетную семантику для создания материализованных представлений. В следующем примере используется пакетная семантика для чтения каталога JSON и создания материализованного представления:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Проверка данных с ожидаемыми ожиданиями

Вы можете использовать ожидания для установки и применения ограничений качества данных. См. Управление качеством данных с помощью ожиданий конвейера.

Следующий код определяет ожидание, которое valid_data удаляет записи, которые являются null во время приема данных:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Запрос материализованных представлений и таблиц потоковой передачи, определенных в конвейере

Используйте схему LIVE для запроса других материализованных представлений и таблиц потоковой передачи, определенных в конвейере.

В следующем примере определяются четыре набора данных:

  • Потоковая таблица с именем orders , которая загружает данные JSON.
  • Материализованное представление с именем customers , которое загружает данные CSV.
  • Материализованное представление с именемcustomer_orders, которое объединяет записи из orders наборов данных и customers присваивает customer_idметку времени заказа дате и выбирает поля , order_numberstateа также поля .order_date
  • Материализованное представление с именем daily_orders_by_state , которое объединяет ежедневное количество заказов для каждого состояния.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;