Поделиться через


Руководство: Запуск вашего первого DLT-пайплайна

В этом руководстве описано, как настроить первый конвейер DLT, написать базовый код ETL и запустить обновление конвейера.

Все действия, описанные в этом руководстве, предназначены для рабочих областей с включенным каталогом Unity. Вы также можете настроить конвейеры DLT для работы с устаревшим хранилищем метаданных Hive. См. статью Использование конвейеров DLT с устаревшим хранилищем метаданных Hive.

Заметка

В этом руководстве содержатся инструкции по разработке и проверке нового кода конвейера с помощью записных книжек Databricks. Конвейеры также можно настроить с помощью исходного кода в файлах Python или SQL.

Конвейер можно настроить для запуска кода, если у вас уже есть исходный код, написанный с помощью синтаксиса DLT. См. настройте конвейер DLT.

Вы можете использовать полностью декларативный синтаксис SQL в Databricks SQL для регистрации и задания расписаний обновления для материализованных представлений и потоковых таблиц в качестве объектов, управляемых каталогом Unity. См. статью Использование материализованных представлений в Databricks SQL и Загрузка данных с использованием потоковых таблиц в Databricks SQL.

Пример: прием и обработка данных о именах детей в Нью-Йорке

В примере в этой статье используется общедоступный набор данных, содержащий записи имена детей штата Нью-Йорк. В этом примере показано использование конвейера DLT для:

  • Чтение необработанных CSV-данных из тома в таблицу.
  • Прочитайте записи из таблицы поступающих данных и используйте ожидания DLT и для создания новой таблицы, содержащей очищенные данные.
  • Используйте очищенные записи в качестве входных данных для запросов DLT, которые создают производные наборы данных.

Этот код демонстрирует упрощенный пример архитектуры медальона. См. Что такое архитектура «medallion lakehouse»?.

Реализации этого примера предоставляются для Python и SQL. Выполните действия, чтобы создать конвейер и записную книжку, а затем скопируйте предоставленный код.

Также предоставляются примеры записных книжек с полным кодом.

Требования

  • Чтобы запустить конвейер, необходимо иметь разрешение на создание кластера или доступ к политике кластера, определяющей кластер DLT. Среда выполнения DLT инициализирует кластер перед запуском конвейера и выдает ошибку, если у вас нет правильного разрешения.
  • Все пользователи могут активировать обновления с помощью бессерверных конвейеров по умолчанию. Бессерверные технологии должны быть включены на уровне учетной записи и могут быть недоступны в регионе вашей рабочей области. См. Включение бессерверных вычислений.
  • Примеры, приведенные в этом руководстве, используют Unity Catalog. Databricks рекомендует создать новую схему для выполнения этого руководства, так как в целевой схеме создаются несколько объектов базы данных.

    • Чтобы создать новую схему в каталоге, необходимо иметь ALL PRIVILEGES или USE CATALOG и CREATE SCHEMA привилегии.
    • Если создать новую схему невозможно, выполните это руководство по существующей схеме. У вас должны быть следующие привилегии:
      • USE CATALOG для родительского каталога.
      • Привилегии ALL PRIVILEGES или USE SCHEMA, CREATE MATERIALIZED VIEWи CREATE TABLE в целевой схеме.
    • В этом руководстве используется том для хранения примеров данных. Databricks рекомендует создать новый том для этого руководства. При создании новой схемы для этого руководства можно создать новый том в этой схеме.
      • Чтобы создать новый том в существующей схеме, необходимо иметь следующие привилегии:
        • USE CATALOG для родительского каталога.
        • ALL PRIVILEGES или USE SCHEMA, и CREATE VOLUME привилегия в целевой схеме.
      • При необходимости можно использовать существующий том. У вас должны быть следующие привилегии:
        • USE CATALOG для родительского каталога.
        • USE SCHEMA для родительской схемы.
        • ALL PRIVILEGES или READ VOLUME и WRITE VOLUME на целевом томе.

    Чтобы задать эти разрешения, обратитесь к администратору Databricks. Дополнительные сведения о привилегиях каталога Unity см. в привилегиях каталога Unity и защищаемых объектах.

шаг 0. Скачивание данных

В этом примере загружаются данные из тома каталога Unity. Следующий код скачивает CSV-файл и сохраняет его в указанном томе. Откройте новую записную книжку и запустите следующий код, чтобы скачать эти данные в указанный том:

import urllib

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

urllib.request.urlretrieve(download_url, volume_path + filename)

Замените <catalog-name>, <schema-name>и <volume-name> каталогом, схемой и именами томов для тома каталога Unity. Предоставленный код пытается создать указанную схему и том, если эти объекты не существуют. У вас должны быть соответствующие привилегии для создания и записи в объекты в каталоге Unity. См. требований.

Заметка

Убедитесь, что эта записная книжка успешно запущена, прежде чем продолжить работу с руководством. Не настраивайте эту записную книжку как часть конвейера.

шаг 1. Создание конвейера

DLT создает конвейеры путем разрешения зависимостей, определенных в записных книжках или файлах (называемых исходного кода) с помощью синтаксиса DLT. Каждый файл исходного кода может содержать только один язык, но в конвейер можно добавить несколько записных книжек или файлов, относящихся к определенному языку.

Важный

Не настраивайте ресурсы в поле исходного кода. Оставление этого поля пустым создает и настраивает записную книжку для написания исходного кода.

Инструкции из этого руководства используют бессерверные вычислительные ресурсы и каталог Unity. Используйте параметры по умолчанию для всех параметров конфигурации, не указанных в этих инструкциях.

Заметка

Если бессерверные вычисления не включены или не поддерживаются в вашей рабочей области, вы можете следовать руководству, в соответствии с параметрами вычислений по умолчанию. Необходимо вручную выбрать каталог Unity в разделе параметры хранилища в разделе Назначение интерфейса Создать конвейер.

Чтобы настроить новый конвейер, выполните следующие действия.

  1. На боковой панели щелкните DLT.
  2. Щелкните Создать конвейер.
  3. В имени конвейеравведите уникальное имя конвейера.
  4. Установите этот флажок Serverless.
  5. В пункте назначения, чтобы настроить место в каталоге Unity, где публикуются таблицы, выберите каталог и схему.
  6. В Advancedщелкните Добавить конфигурации, а затем задайте параметры пайплайна для каталога, схемы и тома, в которые вы загрузили данные, используя следующие имена параметров:
    • my_catalog
    • my_schema
    • my_volume
  7. Щелкните Создать.

Пользовательский интерфейс конвейеров отображается для нового конвейера. Записная книжка исходного кода автоматически создается и настроена для конвейера.

Записная книжка создается в новом каталоге в пользовательском каталоге. Имя нового каталога и файла совпадает с именем конвейера. Например, /Users/your.username@databricks.com/my_pipeline/my_pipeline.

Ссылка для доступа к этой записной книжке находится в поле исходный код на панели сведений о конвейере. Щелкните ссылку, чтобы открыть записную книжку, прежде чем перейти к следующему шагу.

Шаг 2. Объявление материализованных представлений и потоковых таблиц в записной книжке с помощью Python или SQL

Записные книжки Datbricks можно использовать для интерактивной разработки и проверки исходного кода для конвейеров DLT. Чтобы использовать эту функцию, необходимо подключить записную книжку к конвейеру. Чтобы подключить только что созданную записную книжку к созданному конвейеру, выполните следующие действия.

  1. Щелкните Подключить в правом верхнем углу, чтобы открыть меню конфигурации вычислений.
  2. Наведите указатель мыши на имя конвейера, созданного на шаге 1.
  3. Нажмите Подключиться.

Изменения пользовательского интерфейса для включения кнопок "Проверить" и "Пуск" в правом верхнем углу. Дополнительные сведения о поддержке разработки кода конвейера см. в статье Разработка и отладка конвейеров DLT в записных книжках.

Важный

  • Конвейеры DLT во время планирования оценивают все ячейки ноутбука. В отличие от ноутбуков, используемых для вычислений общего назначения или назначенных как задания, конвейеры не гарантируют, что ячейки запускаются в указанном порядке.
  • Записные книжки могут содержать только один язык программирования. Не смешивайте код Python и SQL в записных книжках исходного кода потока данных.

Дополнительные сведения о разработке кода с помощью Python или SQL см. в статье Разработка кода конвейера с помощью Python или Разработка кода конвейера с помощью SQL.

Пример кода конвейера

Чтобы реализовать пример в этом руководстве, скопируйте и вставьте следующий код в ячейку в записной книжке, настроенной как исходный код для конвейера.

Указанный код выполняет следующие действия:

  • Импортирует необходимые модули (только Python).
  • Ссылается на параметры, определенные во время настройки конвейера.
  • Определяет потоковую таблицу с именем baby_names_raw, загружающую данные из тома.
  • Определяет материализованное представление с именем baby_names_prepared, которое проверяет полученные данные.
  • Определяет материализованное представление с именем top_baby_names_2021 с высоким уровнем уточнений представления данных.

Питон

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

шаг 3. Запуск обновления конвейера

Чтобы запустить обновление конвейера, нажмите кнопку Пуск в правом верхнем углу пользовательского интерфейса записной книжки.

примеры записных книжек

Следующие записные книжки содержат те же примеры кода, которые приведены в этой статье. Эти записные книжки имеют те же требования, что и действия, описанные в этой статье. См. требования.

Чтобы импортировать записную книжку, выполните следующие действия.

  1. Откройте интерфейс записной книжки.
    • Щелкните + Создать>записную книжку.
    • Откроется пустая записная книжка.
  2. Щелкните Файл>импорт.... Откроется диалоговое окно импорта .
  3. Выберите параметр URL-адреса для импорта из.
  4. Вставьте URL-адрес записной книжки.
  5. Щелкните Импорт.

В этом руководстве необходимо запустить записную книжку установки данных перед настройкой и запуском конвейера DLT. Импортируйте следующую записную книжку, подключите ее к вычислительному ресурсу, укажите необходимые переменные для my_catalog, my_schemaи my_volumeи нажмите Выполнить все.

Руководство по скачиванию данных для конвейеров

Получение записной книжки

В следующих записных книжках приведены примеры в Python или SQL. При импорте записной книжки он сохраняется в домашнем каталоге пользователя.

После импорта одной из представленных ниже записных книжек выполните шаги по созданию конвейера, но используйте средство выбора файла исходного кода для выбора скачанной записной книжки. После создания конвейера с записной книжкой, настроенной как исходный код, в пользовательском интерфейсе конвейера щелкните Запустить, чтобы запустить обновление.

Начало работы с записной книжкой Python DLT

Получение записной книжки

Начните работать с записной книжкой DLT SQL

Получение записной книжки

Дополнительные ресурсы