教程:运行端到端湖屋分析管道

本教程介绍了如何为 Azure Databricks 湖屋设置端到端分析管道。

重要

本教程使用交互式笔记本在已启用 Unity Catalog 的群集上完成 Python 中的常见 ETL 任务。 如果未使用 Unity Catalog,请参阅在 Azure Databricks 上运行第一个 ETL 工作负载

本教程中的任务

本文结束时,你会感到自在:

  1. 启动已启用 Unity Catalog 的计算群集
  2. 创建 Databricks 笔记本
  3. 从 Unity Catalog 外部位置写入和读取数据
  4. 使用自动加载程序将增量数据引入配置到 Unity Catalog 表
  5. 执行笔记本单元格来处理、查询和预览数据
  6. 将笔记本计划为 Databricks 作业
  7. 在 Databricks SQL 中查询 Unity Catalog 表

Azure Databricks 提供了一套生产就绪工具,使数据专业人员能够快速开发和部署提取、转换和加载 (ETL) 管道。 Unity Catalog 让数据专员可为整个组织的用户配置和保护存储凭据、外部位置和数据库对象。 Databricks SQL 让分析师可针对生产 ETL 工作负载中使用的同一表运行 SQL 查询,从而实现大规模的实时商业智能。

还可使用增量实时表构建 ETL 管道。 Databricks 创建了增量实时表,以降低构建、部署和维护生产 ETL 管道的复杂性。 请参阅教程:运行第一个增量实时表管道

要求

注意

如果没有群集控制特权,只要拥有群集访问权限,就仍然可以完成以下大部分步骤。

步骤 1:创建群集

若要进行探索性数据分析和数据工程,请创建一个群集来提供执行命令所需的计算资源。

  1. 单击边栏中的 “计算”图标计算”。
  2. 单击边栏中的 新建图标新建”,然后选择“群集”。 此操作将会打开“新建群集/计算”页。
  3. 为群集指定唯一的名称。
  4. 请选择“单节点”单选按钮。
  5. 请从“访问模式”下拉列表中选择“单用户”。
  6. 请确保电子邮件地址在“单用户”字段中可见。
  7. 请选择所需的“Databricks 运行时版本”、11.1 或更高版本来使用 Unity Catalog。
  8. 单击“创建计算”以创建群集。

若要了解有关 Databricks 群集的详细信息,请参阅计算

步骤 2:创建 Databricks 笔记本

若要在工作区中创建笔记本,请单击边栏中的““新建”图标 新建”,然后单击“笔记本”。 将在工作区中打开一个空白笔记本。

若要了解有关创建和管理笔记本的详细信息,请参阅管理笔记本

步骤 3:从 Unity Catalog 托管的外部位置写入和读取数据

Databricks 建议使用自动加载程序来引入增量数据。 自动加载程序会在新文件到达云对象存储时自动对其进行检测和处理。

使用 Unity Catalog 管理对外部位置的安全访问。 对外部位置具有 READ FILES 权限的用户或服务主体可以使用自动加载程序引入数据。

通常,由于来自其他系统的写入,数据将到达外部位置。 在此演示中,可以通过将 JSON 文件写入外部位置来模拟数据到达。

将以下代码复制到笔记本单元格中。 将 catalog 的字符串值替换为具有 CREATE CATALOGUSE CATALOG 权限的目录名。 将 external_location 的字符串值替换为具有 READ FILESWRITE FILESCREATE EXTERNAL TABLE 权限的外部位置的路径。

可将外部位置定义为整个存储容器,但通常指向嵌套在容器中的目录。

外部位置路径的正确格式为 "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

执行此单元格应打印包含 12 字节内容的一行,打印字符串“Hello world!”,并显示所提供目录中存在的所有数据库。 如果无法运行此单元格,请确认你在已启用 Unity Catalog 的工作区中,并向工作区管理员请求适当的权限来完成本教程。

以下 Python 代码使用电子邮件地址在所提供目录中创建唯一的数据库,并在提供的外部位置创建唯一的存储位置。 执行此单元格将删除与本教程相关的所有数据,支持以幂等形式执行此示例。 定义并实例化的某个类,可用于模拟从联网系统到达源外部位置的批量数据。

将此代码复制到笔记本中的新单元格,并执行它来配置环境。

备注

此代码中定义的变量应让你能够安全执行该代码,而不会存在与现有工作区资产或其他用户发生冲突的风险。 在执行此代码时,受限的网络或存储权限将引发错误;请联系工作区管理员来排查这些限制。


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

现在,可以通过将以下代码复制到单元格中并执行该代码来获取一批数据。 最多可以手动执行此单元 60 次来触发新数据到达。

RawData.land_batch()

步骤 4:配置自动加载程序,用于将数据引入 Unity Catalog

Databricks 建议使用 Delta Lake 存储数据。 Delta Lake 是开放源代码存储层,提供 ACID 事务并启用数据湖屋。 Delta Lake 是在 Databricks 中创建的表的默认格式。

若要配置自动加载程序以将数据引入到 Unity Catalog 表,请将以下代码复制并粘贴到笔记本的空单元格中:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

若要了解有关自动加载程序的详细信息,请参阅什么是自动加载程序?

若要了解有关 Unity Catalog 的结构化流的详细信息,请参阅将 Unity Catalog 与结构化流配合使用

步骤 5:处理数据并与之交互

笔记本逐个单元格执行逻辑。 使用以下步骤在单元格中执行逻辑:

  1. 若要运行在上一步中完成的单元格,请选择该单元格并按 SHIFT+ENTER。

  2. 若要查询刚刚创建的表,请将以下代码复制并粘贴到某个空单元格中,然后按 SHIFT+ENTER 运行单元格。

    df = spark.read.table(table)
    
  3. 若要预览 DataFrame 中的数据,请将以下代码复制并粘贴到某个空单元格中,然后按 SHIFT+ENTER 运行单元格。

    display(df)
    

若要详细了解用于可视化数据的交互式选项,请参阅 Databricks 笔记本中的可视化效果

步骤 6:安排作业

可以将 Databricks 笔记本作为生产脚本运行,方法是在 Databricks 作业中将其添加为任务。 在此步骤中,你将创建可手动触发的新作业。

若要将笔记本计划为任务,请执行以下操作:

  1. 单击标题栏右侧的“计划”。
  2. 为“作业名”输入唯一的名称。
  3. 单击“手动”。
  4. 在“群集”下拉列表中,选择在步骤 1 中创建的群集。
  5. 单击“创建”。
  6. 在出现的窗口中单击“立即运行”。
  7. 若要查看作业运行结果,请单击“上次运行”时间戳旁边的外部链接图标。

有关作业的详细信息,请参阅什么是 Databricks 作业?

步骤 7:在 Databricks SQL 中查询表

对当前目录具有 USE CATALOG 权限、对当前架构具有 USE SCHEMA 权限以及对表具有 SELECT 权限的任何人员都可以从其首选的 Databricks API 中查询表的内容。

你需要访问正在运行的 SQL 仓库才能在 Databricks SQL 中执行查询。

之前在本教程中创建的表名为 target_table。 可以使用第一个单元格中提供的目录和具有 e2e_lakehouse_<your-username> 模式的数据库来对其进行查询。 可以使用目录资源管理器来查找你创建的数据对象。

其他集成

详细了解使用 Azure Databricks 进行数据工程的集成和工具: