使用 dbt 转换数据
注意
Apache Airflow 作业由 Apache Airflow 提供支持。
dbt(数据生成工具)是一个开源命令行接口 (CLI),它通过以结构化、可维护的方式管理复杂的 SQL 代码,简化了数据仓库中的数据转换和建模。 它使数据团队能够在分析管道的核心位置创建可靠、可测试的转换。
与 Apache Airflow 搭配使用时,dbt 的转换功能可通过 Airflow 的调度、协调和任务管理功能得到增强。 这种组合方法将 dbt 的转型专业技术与 Airflow 的工作流管理相结合,提供了高效、强大的数据管道,最终实现了更快、更有洞察力的数据驱动决策。
本教程演示如何创建 Apache Airflow DAG,该 DAG 使用 dbt 转换存储在 Microsoft Fabric 数据仓库中的数据。
先决条件
若要开始,必须满足以下先决条件:
在租户中启用 Apache Airflow 作业。
注意
由于 Apache Airflow 作业处于预览状态,因此需要通过租户管理员启用它。如果你已看到 Apache Airflow 作业,则租户管理员可能已启用它。
创建服务主体。 将服务主体添加为在其中创建数据仓库的工作区中的
Contributor
。如果还没有仓库,请创建一个 Fabric 仓库。 使用数据管道将样本数据引入仓库。 在本教程中,我们使用 NYC Taxi-Green 示例。
使用 dbt 转换存储在 Fabric 仓库中的数据
其中包括以下步骤:
指定要求
在 dags
文件夹中创建文件 requirements.txt
。 将以下包添加为 Apache Airflow 要求。
astronomer-cosmos:此包用于将 dbt 核心项目作为 Apache Airflow dag 和任务组运行。
dbt-fabric:此包用于创建 dbt 项目,然后可将其部署到 Fabric 数据仓库
astronomer-cosmos==1.0.3 dbt-fabric==1.5.0
在 Apache Airflow 作业提供的 Fabric 托管存储中创建 dbt 项目。
在本部分中,我们将使用以下目录结构在 Apache Airflow 作业中为数据集
nyc_taxi_green
创建一个示例 dbt 项目。dags |-- my_cosmos_dag.py |-- nyc_taxi_green | |-- profiles.yml | |-- dbt_project.yml | |-- models | | |-- nyc_trip_count.sql | |-- target
在具有
nyc_taxi_green
文件的dags
文件夹中创建名为profiles.yml
的文件夹。 此文件夹包含 dbt 项目所需的所有文件。将以下内容复制到
profiles.yml
。 此配置文件包含 dbt 使用的数据库连接详细信息和配置文件。 更新占位符值并保存文件。config: partial_parse: true nyc_taxi_green: target: fabric-dev outputs: fabric-dev: type: fabric driver: "ODBC Driver 18 for SQL Server" server: <sql connection string of your data warehouse> port: 1433 database: "<name of the database>" schema: dbo threads: 4 authentication: ServicePrincipal tenant_id: <Tenant ID of your service principal> client_id: <Client ID of your service principal> client_secret: <Client Secret of your service principal>
创建一个
dbt_project.yml
文件并复制以下内容。 此文件指定项目级配置。name: "nyc_taxi_green" config-version: 2 version: "0.1" profile: "nyc_taxi_green" model-paths: ["models"] seed-paths: ["seeds"] test-paths: ["tests"] analysis-paths: ["analysis"] macro-paths: ["macros"] target-path: "target" clean-targets: - "target" - "dbt_modules" - "logs" require-dbt-version: [">=1.0.0", "<2.0.0"] models: nyc_taxi_green: materialized: table
在
nyc_taxi_green
文件夹中创建models
文件夹。 在本教程中,我们将在名为nyc_trip_count.sql
的文件中创建示例模型,该文件创建了显示每个供应商每天行程次数的表。 复制文件中的以下内容。with new_york_taxis as ( select * from nyctlc ), final as ( SELECT vendorID, CAST(lpepPickupDatetime AS DATE) AS trip_date, COUNT(*) AS trip_count FROM [contoso-data-warehouse].[dbo].[nyctlc] GROUP BY vendorID, CAST(lpepPickupDatetime AS DATE) ORDER BY vendorID, trip_date; ) select * from final
创建 Apache Airflow DAG 以编排 dbt 作业
在
dags
文件夹中创建名称为my_cosmos_dag.py
的文件,并将以下内容粘贴到其中。import os from pathlib import Path from datetime import datetime from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) profile_config = ProfileConfig( profile_name="nyc_taxi_green", target_name="fabric-dev", profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml", ) dbt_fabric_dag = DbtDag( project_config=ProjectConfig(DBT_ROOT_PATH,), operator_args={"install_deps": True}, profile_config=profile_config, schedule_interval="@daily", start_date=datetime(2023, 9, 10), catchup=False, dag_id="dbt_fabric_dag", )