Преобразование данных с помощью dbt
Примечание.
Задание Apache Airflow работает под управлением Apache Airflow.
dbt(Data Build Tool) — это интерфейс командной строки с открытым исходным кодом (CLI), упрощающий преобразование данных и моделирование в хранилищах данных путем управления сложным кодом SQL в структурированном и поддерживаемом режиме. Это позволяет командам данных создавать надежные тестируемые преобразования в основе своих аналитических конвейеров.
При паре с Apache Airflow возможности преобразования dbt дополняются функциями планирования, оркестрации и управления задачами Airflow. Этот объединенный подход, используя опыт преобразования dbt вместе с управлением рабочими процессами Airflow, обеспечивает эффективные и надежные конвейеры данных, что в конечном итоге приводит к более быстрым и подробным решениям, управляемым данными.
В этом руководстве показано, как создать DAG Apache Airflow, использующую dbt для преобразования данных, хранящихся в хранилище данных Microsoft Fabric.
Необходимые компоненты
Чтобы приступить к работе, необходимо выполнить следующие предварительные требования:
Включите задание Apache Airflow в клиенте.
Примечание.
Так как задание Apache Airflow находится в состоянии предварительной версии, его необходимо включить с помощью администратора клиента. Если вы уже видите задание Apache Airflow, возможно, он уже включен администратором клиента.
Создайте субъект-службу. Добавьте субъект-службу в рабочую
Contributor
область, в которой создается хранилище данных.Если у вас его нет, создайте хранилище Fabric. Прием примеров данных в хранилище с помощью конвейера данных. В этом руководстве мы используем пример NYC Taxi-Green .
Преобразование данных, хранящихся в хранилище Fabric, с помощью dbt
В данном разделе описаны следующие действия.
- Укажите требования.
- Создайте проект dbt в управляемом хранилище Fabric, предоставленное заданием Apache Airflow..
- Создание DAG Apache Airflow для оркестрации заданий dbt
Указание требований
Создайте файл requirements.txt
в папке dags
. Добавьте следующие пакеты в качестве требований Apache Airflow.
астроном-cosmos: этот пакет используется для запуска основных проектов dbt в качестве dags Apache Airflow и групп задач.
dbt-fabric: этот пакет используется для создания проекта dbt, который затем можно развернуть в хранилище данных Fabric.
astronomer-cosmos==1.0.3 dbt-fabric==1.5.0
Создайте проект dbt в управляемом хранилище Fabric, предоставленном заданием Apache Airflow.
В этом разделе мы создадим пример проекта dbt в задании Apache Airflow для набора
nyc_taxi_green
данных со следующей структурой каталогов.dags |-- my_cosmos_dag.py |-- nyc_taxi_green | |-- profiles.yml | |-- dbt_project.yml | |-- models | | |-- nyc_trip_count.sql | |-- target
Создайте папку с именем
nyc_taxi_green
в папке с файломdags
profiles.yml
. Эта папка содержит все файлы, необходимые для проекта dbt.Скопируйте следующее содержимое в папку
profiles.yml
. Этот файл конфигурации содержит сведения о подключении к базе данных и профили, используемые dbt. Обновите значения заполнителей и сохраните файл.config: partial_parse: true nyc_taxi_green: target: fabric-dev outputs: fabric-dev: type: fabric driver: "ODBC Driver 18 for SQL Server" server: <sql connection string of your data warehouse> port: 1433 database: "<name of the database>" schema: dbo threads: 4 authentication: ServicePrincipal tenant_id: <Tenant ID of your service principal> client_id: <Client ID of your service principal> client_secret: <Client Secret of your service principal>
Создайте файл и скопируйте следующее
dbt_project.yml
содержимое. Этот файл указывает конфигурацию уровня проекта.name: "nyc_taxi_green" config-version: 2 version: "0.1" profile: "nyc_taxi_green" model-paths: ["models"] seed-paths: ["seeds"] test-paths: ["tests"] analysis-paths: ["analysis"] macro-paths: ["macros"] target-path: "target" clean-targets: - "target" - "dbt_modules" - "logs" require-dbt-version: [">=1.0.0", "<2.0.0"] models: nyc_taxi_green: materialized: table
Создайте папку
models
в папкеnyc_taxi_green
. В этом руководстве мы создадим пример модели в файле с именемnyc_trip_count.sql
, который создает таблицу с числом поездок в день для каждого поставщика. Скопируйте следующее содержимое в файле.with new_york_taxis as ( select * from nyctlc ), final as ( SELECT vendorID, CAST(lpepPickupDatetime AS DATE) AS trip_date, COUNT(*) AS trip_count FROM [contoso-data-warehouse].[dbo].[nyctlc] GROUP BY vendorID, CAST(lpepPickupDatetime AS DATE) ORDER BY vendorID, trip_date; ) select * from final
Создание DAG Apache Airflow для оркестрации заданий dbt
Создайте файл с именем
my_cosmos_dag.py
вdags
папке и вставьте в него следующее содержимое.import os from pathlib import Path from datetime import datetime from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) profile_config = ProfileConfig( profile_name="nyc_taxi_green", target_name="fabric-dev", profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml", ) dbt_fabric_dag = DbtDag( project_config=ProjectConfig(DBT_ROOT_PATH,), operator_args={"install_deps": True}, profile_config=profile_config, schedule_interval="@daily", start_date=datetime(2023, 9, 10), catchup=False, dag_id="dbt_fabric_dag", )
Запуск DAG
Чтобы просмотреть dag, загруженный в пользовательский интерфейс Apache Airflow, нажмите кнопку
Monitor in Apache Airflow.
Проверка данных
- После успешного выполнения для проверки данных вы увидите новую таблицу с именем "nyc_trip_count.sql", созданную в хранилище данных Fabric.