在 Databricks 中生成端到端数据管道

本文演示如何创建和部署端到端数据处理管道,包括如何引入原始数据、转换数据,以及对处理过的数据运行分析。

注意

尽管本文演示如何使用 Azure Databricks 笔记本和 Azure Databricks 作业创建完整的数据管道来精心安排好工作流,不过 Azure Databricks 建议使用“Delta Live 表”,这是一种声明性接口,用于构建可靠、可维护、可测试的数据处理管道。

什么是数据管道?

数据管道实现从源系统移动数据、根据要求转换数据以及将数据存储到目标系统所要执行的步骤。 数据管道包括将原始数据转换为用户可以使用的准备好的数据所需的所有过程。 例如,数据管道可以准备数据,以便数据分析师和数据科学家可以通过分析和报告从数据中获得价值。

提取、转换和加载 (ETL) 工作流是数据管道的常见示例。 在 ETL 处理中,数据从源系统引入并写入暂存区域,根据要求(确保数据质量、删除重复数据记录等)进行转换,然后写入目标系统,如数据仓库或数据湖。

数据管道步骤

为了帮助你开始在 Azure Databricks 上构建数据管道,本文中包含的示例演示了如何创建数据处理工作流:

  • 使用 Azure Databricks 功能浏览原始数据集。
  • 创建 Databricks 笔记本以引入原始源数据并将原始数据写入目标表。
  • 创建 Databricks 笔记本以转换原始源数据并将转换后的数据写入目标表。
  • 创建 Databricks 笔记本以查询转换后的数据。
  • 使用 Azure Databricks 作业自动执行数据管道。

要求

  • 已登录到 Azure Databricks,并且已进入数据科学与工程工作区。
  • 你有权限创建群集访问群集
  • (可选)必须在 Unity Catalog 中创建目录架构才能将表格发布到 Unity Catalog。

示例:Million Song Dataset

此示例中使用的数据集是 Million Song Dataset 的子集,该数据集是当代音乐曲目的特征和元数据集合。 此数据集在 Azure Databricks 工作区中包含的示例数据集中可用。

步骤 1:创建群集

若要执行此示例中的数据处理和分析,请创建一个群集来提供运行命令所需的计算资源。

注意

由于此示例使用存储在 DBFS 中的示例数据集,并建议将表持久保留到 Unity Catalog,因此需要创建配置了单用户访问模式的群集。 单用户访问模式提供对 DBFS 的完全访问权限,同时启用对 Unity Catalog 的访问权限。 请参阅 DBFS 和 Unity Catalog 的最佳做法

  1. 单击侧栏中的“计算”。
  2. 请在“计算”页单击“创建群集”。
  3. 在“新群集”页上,输入群集的唯一名称。
  4. 在“访问模式”下,选择“单用户”。
  5. 在“单用户或服务主体访问”中,选择你的用户名。
  6. 将其余值保留为默认状态,然后单击“创建群集”。

若要了解有关 Databricks 群集的详细信息,请参阅计算

步骤 2:浏览源数据

若要了解如何使用 Azure Databricks 接口浏览原始源数据,请参阅浏览数据管道的源数据。 如果要直接转到引入和准备数据,请继续执行步骤 3:引入原始数据

步骤 3:引入原始数据

在此步骤中,将原始数据加载到表中,以待接受进一步处理。 若要管理 Databricks 平台上的数据资产(例如表),Databricks 建议使用 Unity Catalog。 但是,如果你没有创建所需目录和架构的权限,以将表发布到 Unity Catalog,则仍可以通过将表发布到 Hive 元存储来完成以下步骤。

若要引入数据,Databricks 建议使用自动加载程序。 自动加载程序会在新文件到达云对象存储时自动对其进行检测和处理。

可以将自动加载程序配置为自动检测已加载数据的架构,这样无需显式声明数据架构即可初始化表,并在引入新列时让表架构完成相应的演变。 这样就无需一直手动跟踪和应用架构更改。 使用自动加载程序时,Databricks 建议进行架构推理。 但是,如数据浏览步骤中所示,歌曲数据不包含标头信息。 由于标头未随数据一起存储,因此需要显式定义架构,如下一个示例所示。

  1. 在边栏中,单击 新建图标新建”,然后从菜单中选择“笔记本”。 此时会显示“创建笔记本”对话框。

  2. 输入笔记本的名称,例如 Ingest songs data。 默认情况下:

    • “Python”是选择的语言。
    • 笔记本将附加到你使用的最后一个群集。 在本例中,是你在步骤 1:创建群集中创建的群集。
  3. 在笔记本的第一个单元格中输入以下内容:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    如果使用的是 Unity Catalog,请将 <table-name> 替换为目录、架构和表格以包含引入的记录(例如 data_pipelines.songs_data.raw_song_data)。 否则,请将 <table-name> 替换为表的名称以包含引入的记录,例如 raw_song_data

    <checkpoint-path> 替换为 DBFS 中用于维护检查点文件的目录路径,例如 /tmp/pipeline_get_started/_checkpoint/song_data

  4. 单击 运行菜单,然后选择“运行单元格”。 此示例使用 README 中的信息定义数据架构,从 file_path 中包含的所有文件中引入歌曲数据,并将数据写入 table_name 指定的表。

步骤 4:准备原始数据

若要准备好用于分析的原始数据,请执行以下步骤,通过筛选掉不需要的列并添加一个包含新记录创建时间戳的新字段来转换原始歌曲数据。

  1. 在边栏中,单击 新建图标新建”,然后从菜单中选择“笔记本”。 此时会显示“创建笔记本”对话框。

  2. 输入笔记本的名称。 例如,Prepare songs data。 将默认语言更改为 SQL

  3. 在笔记本的第一个单元格中输入以下内容:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    如果使用的是 Unity Catalog,请将 <table-name> 替换为目录、架构和表格以包含经过筛选和转换的记录(例如 data_pipelines.songs_data.prepared_song_data)。 否则,将 <table-name> 替换为表的名称以包含筛选和转换的记录(例如 prepared_song_data)。

    <raw-songs-table-name> 替换为表的名称,其中包含在上一步中引入的原始歌曲记录。

  4. 单击 运行菜单,然后选择“运行单元格”。

步骤 5:查询转换后的数据

在此步骤中,你将通过添加查询来分析歌曲数据,从而扩展处理管道。 这些查询使用在上一步中创建的准备好的记录。

  1. 在边栏中,单击 新建图标新建”,然后从菜单中选择“笔记本”。 此时会显示“创建笔记本”对话框。

  2. 输入笔记本的名称。 例如,Analyze songs data。 将默认语言更改为 SQL

  3. 在笔记本的第一个单元格中输入以下内容:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    <prepared-songs-table-name> 替换为包含准备好的数据的表的名称。 例如,data_pipelines.songs_data.prepared_song_data

  4. 在单元格操作菜单中单击 向下箭头图标,选择“在下方添加单元格”,然后在新单元格中输入以下内容:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    <prepared-songs-table-name> 替换为在上一步中创建的已准备好的表的名称。 例如,data_pipelines.songs_data.prepared_song_data

  5. 若要运行查询并查看输出,请单击“全部运行”。

步骤 6:创建 Azure Databricks 作业以运行管道

可以使用 Azure Databricks 作业创建工作流来自动运行数据引入、处理和分析步骤。

  1. 在数据科学与工程工作区中,执行下列操作之一:
    • 在边栏中,单击 工作流图标工作流”,然后单击 创建工作按钮
    • 在边栏中,单击 新建图标新建”,然后选择“作业”。
  2. 在“任务”选项卡上的任务对话框中,将“为作业添加名称...”替换为你的作业名称。 例如,“Songs workflow”。
  3. 在“任务名称”中输入首个任务的名称,例如 Ingest_songs_data
  4. 在“类型”中,选择“笔记本”任务类型。
  5. 在“源”中,选择“工作区”。
  6. 使用文件资源管理器找到数据引入笔记本,单击笔记本名称,然后单击“确认”。
  7. 在“群集”中,选择“Shared_job_cluster”或在 Create a cluster 步骤中创建的群集。
  8. 单击“创建”。
  9. 单击刚刚创建的任务下方的“添加任务”按钮并选择“笔记本”。
  10. 在“任务名称”中输入任务的名称,例如 Prepare_songs_data
  11. 在“类型”中,选择“笔记本”任务类型。
  12. 在“源”中,选择“工作区”。
  13. 使用文件浏览器找到数据准备笔记本,单击笔记本名称,然后单击“确认”。
  14. 在“群集”中,选择“Shared_job_cluster”或在 Create a cluster 步骤中创建的群集。
  15. 单击“创建”。
  16. 单击刚刚创建的任务下方的“添加任务”按钮并选择“笔记本”。
  17. 在“任务名称”中输入任务的名称,例如 Analyze_songs_data
  18. 在“类型”中,选择“笔记本”任务类型。
  19. 在“源”中,选择“工作区”。
  20. 使用文件资源管理器找到数据分析笔记本,单击笔记本名称,然后单击“确认”。
  21. 在“群集”中,选择“Shared_job_cluster”或在 Create a cluster 步骤中创建的群集。
  22. 单击“创建”。
  23. 若要运行工作流,请单击 立即运行按钮。 若要查看运行详细信息,请在作业运行视图中单击该运行的“开始时间”列中的链接。 单击每个任务以查看任务运行详细信息。
  24. 若要查看工作流完成时的结果,请单击最终的数据分析任务。 此时会出现“输出”页,其中显示了查询结果。

步骤 7:计划数据管道作业

注意

为了演示如何使用 Azure Databricks 作业安排计划的工作流,此入门示例将引入、准备和分析步骤拆分到不同的笔记本,然后再使用每个笔记本在作业中创建任务。 如果处理包含在单个笔记本中,则可以直接从 Azure Databricks 笔记本 UI 轻松安排笔记本的日程。 请参阅创建和管理计划的笔记本作业

一个常见要求是按计划运行数据管道。 若要为运行管道的作业定义计划,请执行以下操作:

  1. 单击边栏中的 工作流图标工作流”。
  2. 在“名称”列中单击作业名称。 边侧面板将显示“作业详细信息”。
  3. 在“作业详细信息”面板中单击“添加触发器”,然后在“触发器类型”中选择“计划”。
  4. 指定时间段、开始时间和时区。 (可选)选中“显示 Cron 语法”复选框以使用 Quartz Cron 语法显示和编辑计划。
  5. 单击“ 保存”。

了解更多