使用 dbt 转换数据

注意

Apache Airflow 作业由 Apache Airflow 提供支持。

dbt(数据生成工具)是一个开源命令行接口 (CLI),它通过以结构化、可维护的方式管理复杂的 SQL 代码,简化了数据仓库中的数据转换和建模。 它使数据团队能够在分析管道的核心位置创建可靠、可测试的转换。

与 Apache Airflow 搭配使用时,dbt 的转换功能可通过 Airflow 的调度、协调和任务管理功能得到增强。 这种组合方法将 dbt 的转型专业技术与 Airflow 的工作流管理相结合,提供了高效、强大的数据管道,最终实现了更快、更有洞察力的数据驱动决策。

本教程演示如何创建 Apache Airflow DAG,该 DAG 使用 dbt 转换存储在 Microsoft Fabric 数据仓库中的数据。

先决条件

若要开始,必须满足以下先决条件:

  • 在租户中启用 Apache Airflow 作业。

    注意

    由于 Apache Airflow 作业处于预览状态,因此需要通过租户管理员启用它。如果你已看到 Apache Airflow 作业,则租户管理员可能已启用它。

    1. 转到“管理门户”->“租户设置”-> 在 Microsoft Fabric 下 -> 展开“用户可以创建和使用 Apache Airflow 作业(预览版)”部分。

    2. 选择“应用”。 在租户中启用 Apache Airflow 的屏幕截图。

  • 创建服务主体。 将服务主体添加为在其中创建数据仓库的工作区中的 Contributor

  • 如果还没有仓库,请创建一个 Fabric 仓库。 使用数据管道将样本数据引入仓库。 在本教程中,我们使用 NYC Taxi-Green 示例。

  • 在工作区中创建“Apache Airflow 作业”。

使用 dbt 转换存储在 Fabric 仓库中的数据

其中包括以下步骤:

  1. 指定要求。
  2. 在 Apache Airflow 作业提供的 Fabric 托管存储中创建 dbt 项目。
  3. 创建 Apache Airflow DAG 以编排 dbt 作业

指定要求

dags 文件夹中创建文件 requirements.txt。 将以下包添加为 Apache Airflow 要求。

  • astronomer-cosmos:此包用于将 dbt 核心项目作为 Apache Airflow dag 和任务组运行。

  • dbt-fabric:此包用于创建 dbt 项目,然后可将其部署到 Fabric 数据仓库

       astronomer-cosmos==1.0.3
       dbt-fabric==1.5.0
    

在 Apache Airflow 作业提供的 Fabric 托管存储中创建 dbt 项目。

  1. 在本部分中,我们将使用以下目录结构在 Apache Airflow 作业中为数据集 nyc_taxi_green 创建一个示例 dbt 项目。

       dags
       |-- my_cosmos_dag.py
       |-- nyc_taxi_green
       |  |-- profiles.yml
       |  |-- dbt_project.yml
       |  |-- models
       |  |   |-- nyc_trip_count.sql
       |  |-- target
    
  2. 在具有 nyc_taxi_green 文件的 dags 文件夹中创建名为 profiles.yml 的文件夹。 此文件夹包含 dbt 项目所需的所有文件。 屏幕截图显示了为 dbt 项目创建文件。

  3. 将以下内容复制到 profiles.yml。 此配置文件包含 dbt 使用的数据库连接详细信息和配置文件。 更新占位符值并保存文件。

    config:
      partial_parse: true
    nyc_taxi_green:
      target: fabric-dev
      outputs:
        fabric-dev:
          type: fabric
          driver: "ODBC Driver 18 for SQL Server"
          server: <sql connection string of your data warehouse>
          port: 1433
          database: "<name of the database>"
          schema: dbo
          threads: 4
          authentication: ServicePrincipal
          tenant_id: <Tenant ID of your service principal>
          client_id: <Client ID of your service principal>
          client_secret: <Client Secret of your service principal>
    
  4. 创建一个 dbt_project.yml 文件并复制以下内容。 此文件指定项目级配置。

    name: "nyc_taxi_green"
    
    config-version: 2
    version: "0.1"
    
    profile: "nyc_taxi_green"
    
    model-paths: ["models"]
    seed-paths: ["seeds"]
    test-paths: ["tests"]
    analysis-paths: ["analysis"]
    macro-paths: ["macros"]
    
    target-path: "target"
    clean-targets:
      - "target"
      - "dbt_modules"
      - "logs"
    
    require-dbt-version: [">=1.0.0", "<2.0.0"]
    
    models:
      nyc_taxi_green:
        materialized: table
    
  5. nyc_taxi_green 文件夹中创建 models 文件夹。 在本教程中,我们将在名为 nyc_trip_count.sql 的文件中创建示例模型,该文件创建了显示每个供应商每天行程次数的表。 复制文件中的以下内容。

       with new_york_taxis as (
           select * from nyctlc
       ),
       final as (
         SELECT
           vendorID,
           CAST(lpepPickupDatetime AS DATE) AS trip_date,
           COUNT(*) AS trip_count
         FROM
             [contoso-data-warehouse].[dbo].[nyctlc]
         GROUP BY
             vendorID,
             CAST(lpepPickupDatetime AS DATE)
         ORDER BY
             vendorID,
             trip_date;
       )
       select * from final
    

    屏幕截图显示了 dbt 项目的模型。

创建 Apache Airflow DAG 以编排 dbt 作业

  • dags 文件夹中创建名称为 my_cosmos_dag.py 的文件,并将以下内容粘贴到其中。

    import os
    from pathlib import Path
    from datetime import datetime
    from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
    
    DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green"
    DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
    profile_config = ProfileConfig(
         profile_name="nyc_taxi_green",
         target_name="fabric-dev",
         profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml",
    )
    
    dbt_fabric_dag = DbtDag(
         project_config=ProjectConfig(DBT_ROOT_PATH,),
         operator_args={"install_deps": True},
         profile_config=profile_config,
         schedule_interval="@daily",
         start_date=datetime(2023, 9, 10),
         catchup=False,
         dag_id="dbt_fabric_dag",
    )
    

运行 DAG

  1. 在 Apache Airflow 作业中运行 DAG。 屏幕截图显示运行 dag。

  2. 若要查看在 Apache Airflow UI 中加载的 dag,请单击 Monitor in Apache Airflow.屏幕截图显示如何监视 dbt dag。屏幕截图显示成功的 dag 运行。

验证数据

  • 成功运行后,若要验证数据,你可以查看在 Fabric 数据仓库中创建的名为“nyc_trip_count.sql”的新表。 屏幕截图显示成功的 dbt dag。

快速入门:创建 Apache Airflow 作业