Поделиться через


Преобразование данных с помощью dbt

Примечание.

Задание Apache Airflow работает под управлением Apache Airflow.

dbt(Data Build Tool) — это интерфейс командной строки с открытым исходным кодом (CLI), упрощающий преобразование данных и моделирование в хранилищах данных путем управления сложным кодом SQL в структурированном и поддерживаемом режиме. Это позволяет командам данных создавать надежные тестируемые преобразования в основе своих аналитических конвейеров.

При паре с Apache Airflow возможности преобразования dbt дополняются функциями планирования, оркестрации и управления задачами Airflow. Этот объединенный подход, используя опыт преобразования dbt вместе с управлением рабочими процессами Airflow, обеспечивает эффективные и надежные конвейеры данных, что в конечном итоге приводит к более быстрым и подробным решениям, управляемым данными.

В этом руководстве показано, как создать DAG Apache Airflow, использующую dbt для преобразования данных, хранящихся в хранилище данных Microsoft Fabric.

Необходимые компоненты

Чтобы приступить к работе, необходимо выполнить следующие предварительные требования:

  • Включите задание Apache Airflow в клиенте.

    Примечание.

    Так как задание Apache Airflow находится в состоянии предварительной версии, его необходимо включить с помощью администратора клиента. Если вы уже видите задание Apache Airflow, возможно, он уже включен администратором клиента.

    1. Перейдите на портал администрирования —> параметры клиента в> разделе Microsoft Fabric —> разверните раздел "Пользователи могут создавать и использовать задание Apache Airflow (предварительная версия)".

    2. Выберите Применить. Снимок экрана: включение Apache Airflow в клиенте.

  • Создайте субъект-службу. Добавьте субъект-службу в рабочую Contributor область, в которой создается хранилище данных.

  • Если у вас его нет, создайте хранилище Fabric. Прием примеров данных в хранилище с помощью конвейера данных. В этом руководстве мы используем пример NYC Taxi-Green .

  • Создайте задание Apache Airflow в рабочей области.

Преобразование данных, хранящихся в хранилище Fabric, с помощью dbt

В данном разделе описаны следующие действия.

  1. Укажите требования.
  2. Создайте проект dbt в управляемом хранилище Fabric, предоставленное заданием Apache Airflow..
  3. Создание DAG Apache Airflow для оркестрации заданий dbt

Указание требований

Создайте файл requirements.txt в папке dags . Добавьте следующие пакеты в качестве требований Apache Airflow.

  • астроном-cosmos: этот пакет используется для запуска основных проектов dbt в качестве dags Apache Airflow и групп задач.

  • dbt-fabric: этот пакет используется для создания проекта dbt, который затем можно развернуть в хранилище данных Fabric.

       astronomer-cosmos==1.0.3
       dbt-fabric==1.5.0
    

Создайте проект dbt в управляемом хранилище Fabric, предоставленном заданием Apache Airflow.

  1. В этом разделе мы создадим пример проекта dbt в задании Apache Airflow для набора nyc_taxi_green данных со следующей структурой каталогов.

       dags
       |-- my_cosmos_dag.py
       |-- nyc_taxi_green
       |  |-- profiles.yml
       |  |-- dbt_project.yml
       |  |-- models
       |  |   |-- nyc_trip_count.sql
       |  |-- target
    
  2. Создайте папку с именем nyc_taxi_green в папке с файлом dags profiles.yml . Эта папка содержит все файлы, необходимые для проекта dbt. Снимок экрана: создание файлов для проекта dbt.

  3. Скопируйте следующее содержимое в папку profiles.yml. Этот файл конфигурации содержит сведения о подключении к базе данных и профили, используемые dbt. Обновите значения заполнителей и сохраните файл.

    config:
      partial_parse: true
    nyc_taxi_green:
      target: fabric-dev
      outputs:
        fabric-dev:
          type: fabric
          driver: "ODBC Driver 18 for SQL Server"
          server: <sql connection string of your data warehouse>
          port: 1433
          database: "<name of the database>"
          schema: dbo
          threads: 4
          authentication: ServicePrincipal
          tenant_id: <Tenant ID of your service principal>
          client_id: <Client ID of your service principal>
          client_secret: <Client Secret of your service principal>
    
  4. Создайте файл и скопируйте следующее dbt_project.yml содержимое. Этот файл указывает конфигурацию уровня проекта.

    name: "nyc_taxi_green"
    
    config-version: 2
    version: "0.1"
    
    profile: "nyc_taxi_green"
    
    model-paths: ["models"]
    seed-paths: ["seeds"]
    test-paths: ["tests"]
    analysis-paths: ["analysis"]
    macro-paths: ["macros"]
    
    target-path: "target"
    clean-targets:
      - "target"
      - "dbt_modules"
      - "logs"
    
    require-dbt-version: [">=1.0.0", "<2.0.0"]
    
    models:
      nyc_taxi_green:
        materialized: table
    
  5. Создайте папку models в папке nyc_taxi_green . В этом руководстве мы создадим пример модели в файле с именем nyc_trip_count.sql , который создает таблицу с числом поездок в день для каждого поставщика. Скопируйте следующее содержимое в файле.

       with new_york_taxis as (
           select * from nyctlc
       ),
       final as (
         SELECT
           vendorID,
           CAST(lpepPickupDatetime AS DATE) AS trip_date,
           COUNT(*) AS trip_count
         FROM
             [contoso-data-warehouse].[dbo].[nyctlc]
         GROUP BY
             vendorID,
             CAST(lpepPickupDatetime AS DATE)
         ORDER BY
             vendorID,
             trip_date;
       )
       select * from final
    

    Снимок экрана: модели для проекта dbt.

Создание DAG Apache Airflow для оркестрации заданий dbt

  • Создайте файл с именем my_cosmos_dag.py в dags папке и вставьте в него следующее содержимое.

    import os
    from pathlib import Path
    from datetime import datetime
    from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
    
    DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green"
    DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
    profile_config = ProfileConfig(
         profile_name="nyc_taxi_green",
         target_name="fabric-dev",
         profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml",
    )
    
    dbt_fabric_dag = DbtDag(
         project_config=ProjectConfig(DBT_ROOT_PATH,),
         operator_args={"install_deps": True},
         profile_config=profile_config,
         schedule_interval="@daily",
         start_date=datetime(2023, 9, 10),
         catchup=False,
         dag_id="dbt_fabric_dag",
    )
    

Запуск DAG

  1. Запустите DAG в задании Apache Airflow. Снимок экрана: запуск dag.

  2. Чтобы просмотреть dag, загруженный в пользовательский интерфейс Apache Airflow, нажмите кнопку Monitor in Apache Airflow.Снимок экрана: мониторинг dag dbt.Снимок экрана: успешный запуск dag.

Проверка данных

  • После успешного выполнения для проверки данных вы увидите новую таблицу с именем "nyc_trip_count.sql", созданную в хранилище данных Fabric. Снимок экрана: успешный dag dbt.

Краткое руководство. Создание задания Apache Airflow