教程:运行第一个 Delta Live Tables 管道

本教程将引导你完成配置第一个增量实时表管道、编写基本 ETL 代码和运行管道更新的步骤。

本教程中的所有步骤都针对启用了 Unity 目录的工作区而设计。 还可以将 Delta Live Tables 管道配置为使用旧版 Hive 元存储。 请参阅 将 Delta Live Tables 管道与旧版 Hive 元存储配合使用。

注意

本教程提供有关使用 Databricks 笔记本开发和验证新管道代码的说明。 还可以在 Python 或 SQL 文件中使用源代码配置管道。

如果已有使用 Delta Live Tables 语法编写的源代码,则可以将管道配置为运行代码。 请参阅配置增量实时表管道

可以使用 Databricks SQL 中的完全声明性 SQL 语法将具体化视图和流式处理表的刷新计划注册和设置为 Unity 目录托管对象。 请参阅 Databricks SQL 中使用具体化视图,并使用 Databricks SQL 中的流式处理表加载数据。

示例:引入和处理纽约婴儿姓名数据

本文中的示例使用一个公开的数据集,其中包含纽约市婴儿姓名的记录。 此示例演示如何使用增量实时表管道:

  • 将卷中的原始 CSV 数据读取到表中。
  • 从引入表中读取记录,并使用 Delta Live Tables 预期 创建包含清理数据的新表。
  • 使用清理后的记录作为创建派生数据集的 Delta Live Tables 查询的输入。

此代码演示了一个简化的奖牌体系结构示例。 请参阅什么是奖牌湖屋体系结构?

此示例的实现为 Python 和 SQL 提供。 按照步骤创建新的管道和笔记本,然后复制粘贴提供的代码。

还提供了包含完整代码的示例 笔记本

要求

  • 若要启动管道,必须具有群集创建权限或可以访问定义增量实时表群集的群集策略。 增量实时表运行时在运行管道之前创建群集,如果没有正确的权限,则会创建失败。

  • 默认情况下,所有用户都可以使用无服务器管道触发更新。 必须在帐户级别启用无服务器,在工作区区域中可能不可用。 请参阅启用无服务器计算

  • 本教程中的示例使用 Unity 目录。 Databricks 建议创建新的架构来运行本教程,因为目标架构中创建多个数据库对象。

    • 若要在目录中创建新架构,必须具有ALL PRIVILEGESUSE CATALOG特权。CREATE SCHEMA
    • 如果无法创建新架构,请针对现有架构运行本教程。 必须具有以下特权:
      • USE CATALOG 父目录。
      • ALL PRIVILEGESUSE SCHEMACREATE MATERIALIZED VIEW以及 CREATE TABLE 目标架构的特权。
    • 本教程使用卷来存储示例数据。 Databricks 建议为本教程创建新卷。 如果为本教程创建新的架构,则可以在该架构中创建新卷。
      • 若要在现有架构中创建新卷,必须具有以下权限:
        • USE CATALOG 父目录。
        • ALL PRIVILEGESUSE SCHEMA 目标 CREATE VOLUME 架构的特权。
      • 可以选择使用现有卷。 必须具有以下特权:
        • USE CATALOG 父目录。
        • USE SCHEMA 的父架构。
        • ALL PRIVILEGESREAD VOLUME 目标 WRITE VOLUME 卷上。

    若要设置这些权限,请联系 Databricks 管理员。 有关 Unity 目录权限的详细信息,请参阅 Unity 目录特权和安全对象

步骤 0:下载数据

此示例从 Unity 目录卷加载数据。 以下代码下载 CSV 文件并将其存储在指定的卷中。 打开新笔记本并运行以下代码,将此数据下载到指定的卷:

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

<catalog-name><schema-name><volume-name> 替换为 Unity 目录卷的目录、架构和卷名称。 如果这些对象不存在,则提供的代码将尝试创建指定的架构和卷。 必须具有相应的权限才能在 Unity 目录中创建和写入对象。 请参阅 要求

注意

在继续学习本教程之前,请确保此笔记本已成功运行。 不要将此笔记本配置为管道的一部分。

步骤 1:创建管道

Delta Live Tables 通过使用 Delta Live Tables 语法解析笔记本或文件(称为源代码)中定义的依赖项来创建管道。 每个源代码文件只能包含一种语言,但可以在管道中添加多种语言的笔记本或文件。

重要

请勿在 源代码 字段中配置任何资产。 将此字段留黑会创建并配置用于源代码创作的笔记本。

本教程中的说明使用无服务器计算和 Unity 目录。 使用这些说明中未提及的所有配置选项的默认设置。

注意

如果工作区中未启用或支持无服务器,则可以使用默认计算设置完成本教程。 必须在“创建管道 UI 的目标”部分的“存储”选项手动选择 Unity 目录

若要配置新管道,请执行以下操作:

  1. 单击边栏中的“增量实时表”
  2. 单击“创建管道”
  3. 提供唯一的管道名称
  4. 选中无服务器旁边的框。
  5. 选择要发布数据的目录
  6. 选择目录中的架构
    • 指定用于创建架构的新架构名称。
  7. 使用“高级”下的“添加配置”按钮定义三个管道参数,以添加三个配置。 使用以下参数名称指定下载数据的目录、架构和卷:
    • my_catalog
    • my_schema
    • my_volume
  8. 单击 “创建”

此时会显示新创建的管道的管道 UI。 会自动为管道创建和配置源代码笔记本。

笔记本是在用户目录中的新目录中创建的。 新目录和文件的名称与管道的名称匹配。 例如,/Users/your.username@databricks.com/my_pipeline/my_pipeline

访问此笔记本的链接位于“管道详细信息”面板中的“源代码”字段下。 单击链接以打开笔记本,然后继续执行下一步。

步骤 2:使用 Python 或 SQL 在笔记本中声明具体化视图和流式处理表

可以使用 Datbricks 笔记本以交互方式开发和验证 Delta Live Tables 管道的源代码。 必须将笔记本附加到管道才能使用此功能。 将新创建的笔记本附加到刚刚创建的管道:

  1. 单击右上角的“连接以打开计算配置菜单。
  2. 将鼠标悬停在步骤 1 中创建的管道的名称上。
  3. 单击“连接”。

UI 将更改,以在右上角包括 “验证 ”和 “开始” 按钮。 若要详细了解笔记本对管道代码开发的支持,请参阅 在笔记本中开发和调试增量实时表管道。

重要

  • 在规划期间,增量实时表管道评估笔记本中的所有单元格。 与针对所有用途计算或计划为作业运行的笔记本不同,管道不保证单元格按指定顺序运行。
  • 笔记本只能包含单个编程语言。 不要在管道源代码笔记本中混合 Python 和 SQL 代码。

有关使用 Python 或 SQL 开发代码的详细信息,请参阅 使用 Python 开发管道代码或使用 SQL 开发管道代码。

示例管道代码

若要在本教程中实现该示例,请将以下代码复制并粘贴到配置为管道源代码的笔记本中的单元格中。

提供的代码执行以下操作:

  • 导入必要的模块(仅限 Python)。
  • 引用管道配置期间定义的参数。
  • 定义一个名为从卷引入的流式处理表 baby_names_raw
  • 定义用于 baby_names_prepared 验证引入数据的具体化视图。
  • 定义一个具体化视图,该视图具有 top_baby_names_2021 高度优化的数据视图。

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM LIVE.baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

步骤 3:启动管道更新

若要启动管道更新,请单击 笔记本 UI 右上角的“开始 ”按钮。

示例笔记本

以下笔记本包含本文中提供的相同代码示例。 这些笔记本的要求与本文中的步骤相同。 请参阅 要求

若要导入笔记本,请完成以下步骤:

  1. 打开笔记本 UI。
    • 单击“+ 新建>笔记本”。
    • 此时会打开一个空笔记本。
  2. 单击“文件”>“导入”。 此时会出现“导入”对话框。
  3. 选择“从中导入”的 URL 选项。
  4. 粘贴笔记本的 URL。
  5. 单击“导入”

本教程要求在配置和运行 Delta Live Tables 管道之前运行数据设置笔记本。 导入以下笔记本,将笔记本附加到计算资源,填写所需的变量 my_catalogmy_schema然后单击 my_volume全部运行”。

管道数据下载教程

获取笔记本

以下笔记本在 Python 或 SQL 中提供了示例。 导入笔记本时,该笔记本将保存到用户主目录。

导入以下笔记本之一后,完成创建管道的步骤,但使用 源代码 文件选取器选择下载的笔记本。 使用配置为源代码的笔记本创建管道后,单击管道 UI 中的“开始以触发更新。

增量实时表 Python 笔记本入门

获取笔记本

增量实时表 SQL 笔记本入门

获取笔记本