Руководство: Запуск первого конвейера Delta Live Tables
В этом руководстве описано, как настроить первый конвейер Delta Live Tables, написать базовый код ETL и запустить конвейер update.
Все шаги в этом руководстве предназначены для рабочих пространств с включенным Unity Catalog. Вы также можете настроить конвейеры Delta Live Tables для работы с устаревшим хранилищем метаданных Hive. См. статью Использование конвейеров Delta Live Tables с устаревшим хранилищем метаданных Hive.
Примечание.
В этом руководстве содержатся инструкции по разработке и проверке нового кода конвейера с помощью записных книжек Databricks. Конвейеры также можно настроить с помощью исходного кода в файлах Python или SQL.
Конвейер можно настроить для запуска кода, если у вас уже есть исходный код, написанный с помощью синтаксиса Delta Live Tables. См. Настройте конвейер Delta Live Tables.
Вы можете использовать полностью декларативный синтаксис SQL в Databricks SQL для регистрации и планирования setrefresh расписаний для материализованных views и потоковых tables данных как управляемых объектов Unity Catalog. См. статью "Использование материализованных таблиц" views в Databricks SQL и "Потоковая загрузка данных" tables в Databricks SQL.
Пример: прием и обработка данных о именах детей в Нью-Йорке
В примере в этой статье используется общедоступный набор данных, содержащий записи имен ребенка штата Нью-Йорк. В этом примере показано использование конвейера Delta Live Tables для:
- Считайте необработанные данные CSV из тома в table.
- Считывайте записи из загрузки table и используйте ожидания Delta Live Tables для создания новой table с очищенными данными.
- Используйте очищенные записи в качестве входных данных для запросов Delta Live Tables, которые создают производные наборы данных.
Этот код демонстрирует упрощенный пример архитектуры медальона. См. статью "Что такое архитектура медальона lakehouse?".
Реализации этого примера предоставляются для Python и SQL. Выполните действия, чтобы создать конвейер и записную книжку, а затем скопируйте предоставленный код.
Также предоставляются примеры записных книжек с полным кодом.
Требования
Чтобы запустить конвейер, необходимо иметь разрешение на создание кластера или доступ к политике кластера, определяющей кластер Delta Live Tables. Среда выполнения Delta Live Tables создаёт кластер перед запуском вашего конвейера и завершается с ошибкой, если у вас нет необходимых разрешений.
Все пользователи могут активировать обновления с помощью бессерверных конвейеров по умолчанию. Бессерверные должны быть включены на уровне учетной записи и могут быть недоступны в регионе рабочей области. См. раздел "Включить бессерверные вычисления".
Примеры, приведенные в этом учебном материале, используют Unity Catalog. Databricks рекомендует создать новую schema для выполнения этого руководства, так как в целевой schemaсоздаются несколько объектов базы данных.
- Чтобы создать новую schema в catalog, необходимо иметь
ALL PRIVILEGES
илиUSE CATALOG
иCREATE SCHEMA
привилегии. - Если создать новую schemaнельзя, выполните это руководство на существующем schema. У вас должны быть следующие привилегии:
-
USE CATALOG
для родительского catalog. -
ALL PRIVILEGES
илиUSE SCHEMA
,CREATE MATERIALIZED VIEW
иCREATE TABLE
привилегий на целевом schema.
-
- В этом руководстве используется том для хранения примеров данных. Databricks рекомендует создать новый том для этого руководства. При создании нового schema для этого руководства, вы можете создать новый раздел в нём schema.
- Чтобы создать новый том в существующей schema, необходимо иметь следующие привилегии:
-
USE CATALOG
для родительского catalog. -
ALL PRIVILEGES
илиUSE SCHEMA
и привилегииCREATE VOLUME
в целевом schema.
-
- При необходимости можно использовать существующий том. У вас должны быть следующие привилегии:
-
USE CATALOG
для родительского catalog. -
USE SCHEMA
для родительского schema. -
ALL PRIVILEGES
READ VOLUME
илиWRITE VOLUME
на целевом томе.
-
- Чтобы создать новый том в существующей schema, необходимо иметь следующие привилегии:
Чтобы получить set эти разрешения, обратитесь к администратору Databricks. Для получения дополнительной информации о привилегиях Unity Catalog см. в разделе привилегии Unity Catalog и защищаемые объекты.
- Чтобы создать новую schema в catalog, необходимо иметь
Шаг 0. Скачивание данных
В этом примере загружаются данные из тома Catalog Unity. Следующий код скачивает CSV-файл и сохраняет его в указанном томе. Откройте новую записную книжку и запустите следующий код, чтобы скачать эти данные в указанный том:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
Замените <catalog-name>
, <schema-name>
и <volume-name>
catalog, schemaи именами томов для тома Unity Catalog. Предоставленный код пытается создать указанные schema и том, если эти объекты не существуют. Для создания и записи в объекты в Unity Catalogнеобходимо иметь соответствующие права. См. раздел Требования.
Примечание.
Убедитесь, что эта записная книжка успешно запущена, прежде чем продолжить работу с руководством. Не настраивайте эту записную книжку как часть конвейера.
Шаг 1. Создание конвейера
Delta Live Tables создает конвейеры путем разрешения зависимостей, определенных в записных книжках или файлах (называемых исходным кодом) с помощью синтаксиса Delta Live Tables. Каждый файл исходного кода может содержать только один язык, но в конвейер можно добавить несколько записных книжек или файлов, относящихся к определенному языку.
Внимание
Не настраивайте ресурсы в поле исходного кода . При выходе из этого поля черное поле создает и настраивает записную книжку для разработки исходного кода.
Инструкции в этом руководстве используют бессерверные вычислительные ресурсы и Unity Catalog. Используйте параметры по умолчанию для всех параметров конфигурации, не указанных в этих инструкциях.
Примечание.
Если бессерверные не включены или поддерживаются в рабочей области, вы можете выполнить руководство, как написано с помощью параметров вычислений по умолчанию. Необходимо вручную selectUnity Catalog в разделе параметров хранилища в разделе Целевой пользовательского интерфейса создания конвейера.
Чтобы настроить новый конвейер, выполните следующие действия.
- На боковой панели щелкните Delta Live Tables.
- Щелкните Создать конвейер.
- В поле Название конвейеравведите уникальное имя для конвейера.
- Select флажок Serverless.
- В назначенияпубликуютсяCatalogwhereдля настройки расположения Unity tables, select, Catalog и Schema.
- В Advancedщелкните Добавьте конфигурацию, а затем создайте конвейер parameters для catalog, schemaи тома, в который скачали данные, с помощью следующих имен параметров:
my_catalog
my_schema
my_volume
- Нажмите кнопку Создать.
Пользовательский интерфейс конвейеров отображается для нового конвейера. Записная книжка исходного кода автоматически создается и настроена для конвейера.
Записная книжка создается в новом каталоге в пользовательском каталоге. Имя нового каталога и файла совпадает с именем конвейера. Например, /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
Ссылка на доступ к этой записной книжке находится в поле исходного кода на панели сведений о конвейере. Щелкните ссылку, чтобы открыть записную книжку, прежде чем перейти к следующему шагу.
Шаг 2. Объявите материализованные views и потоковые tables в ноутбуке с использованием Python или SQL.
Записные книжки Datbricks можно использовать для интерактивной разработки и проверки исходного кода для конвейеров Delta Live Tables. Чтобы использовать эту функцию, необходимо подключить записную книжку к конвейеру. Чтобы подключить только что созданную записную книжку к созданному конвейеру, выполните следующие действия.
- Нажмите кнопку "Подключиться" в правом верхнем углу, чтобы открыть меню конфигурации вычислений.
- Наведите указатель мыши на имя конвейера, созданного на шаге 1.
- Щелкните Подключить.
Изменения пользовательского интерфейса для включения кнопок проверки и запуска в правом верхнем углу. Дополнительные сведения о поддержке разработки кода конвейера см. в статье Разработка и отладка конвейеров Delta Live Tables в записных книжках.
Внимание
- Конвейеры Delta Live Tables оценивают все ячейки ноутбука во время планирования. В отличие от записных книжек, выполняемых для вычислений всех целей или запланированных в качестве заданий, конвейеры не гарантируют, что ячейки выполняются в указанном порядке.
- Записные книжки могут содержать только один язык программирования. Не смешивайте код Python и SQL в записных книжках исходного кода конвейера.
Дополнительные сведения о разработке кода с помощью Python или SQL см. в статье Разработка кода конвейера с помощью Python или разработки кода конвейера с помощью SQL.
Пример кода конвейера
Чтобы реализовать пример в этом руководстве, скопируйте и вставьте следующий код в ячейку в записной книжке, настроенной как исходный код для конвейера.
Указанный код выполняет следующие действия:
- Импортирует необходимые модули (только Python).
- Ссылки parameters, определенные при настройке конвейера.
- Определяет стриминг table под именем
baby_names_raw
, получающий данные из тома. - Определяет материализованное представление с именем
baby_names_prepared
, которое проверяет прием данных. - Определяет материализованное представление с именем
top_baby_names_2021
, которое имеет строго уточненное представление данных.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
шаг 3. Запуск конвейера update
Чтобы запустить конвейер update, нажмите кнопку "Пуск" в правом верхнем углу интерфейса ноутбука.
Примеры записных книжек
Следующие записные книжки содержат те же примеры кода, которые приведены в этой статье. Эти записные книжки имеют те же требования, что и действия, описанные в этой статье. См. раздел Требования.
Чтобы импортировать записную книжку, выполните следующие действия.
- Откройте пользовательский интерфейс записной книжки.
- Нажмите кнопку +Создать>
- Откроется пустая записная книжка.
- Последовательно выберите Файл>Импортировать. Откроется диалоговое окно импорта .
параметр URL-адреса для импорта из .- Вставьте URL-адрес записной книжки.
- Нажмите кнопку Импорт.
В этом руководстве необходимо запустить блокнот подготовки данных перед настройкой и запуском канала Delta Live Tables. Импортируйте следующую записную книжку, подключите записную книжку к вычислительному ресурсу, введите необходимую переменную для my_catalog
, а затем my_schema
нажмите кнопку my_volume
".
Руководство по скачиванию данных для конвейеров
В следующих записных книжках приведены примеры в Python или SQL. При импорте записной книжки он сохраняется в домашнем каталоге пользователя.
После импорта одной из приведенных ниже записных книжек выполните действия по созданию конвейера, но