Partilhar via


Transformar dados usando dbt

Nota

O trabalho do Apache Airflow é alimentado pelo Apache Airflow.

dbt (Data Build Tool) é uma interface de linha de comando (CLI) de código aberto que simplifica a transformação e modelagem de dados dentro de data warehouses gerenciando código SQL complexo de forma estruturada e sustentável. Ele permite que as equipes de dados criem transformações confiáveis e testáveis no núcleo de seus pipelines analíticos.

Quando emparelhado com o Apache Airflow, os recursos de transformação do dbt são aprimorados pelos recursos de agendamento, orquestração e gerenciamento de tarefas do Airflow. Essa abordagem combinada, usando a experiência em transformação da dbt juntamente com o gerenciamento de fluxo de trabalho da Airflow, oferece pipelines de dados eficientes e robustos, levando a decisões baseadas em dados mais rápidas e perspicazes.

Este tutorial ilustra como criar um DAG Apache Airflow que usa dbt para transformar dados armazenados no Microsoft Fabric Data Warehouse.

Pré-requisitos

Para começar, você deve preencher os seguintes pré-requisitos:

  • Habilite o Apache Airflow Job em seu locatário.

    Nota

    Como o trabalho do Apache Airflow está em estado de visualização, você precisa habilitá-lo por meio do administrador do locatário. Se você já vir o Apache Airflow Job, o administrador do locatário pode já tê-lo habilitado.

    1. Vá para Portal de Administração -> Configurações do Locatário -> Em Microsoft Fabric -> Expanda a seção "Os usuários podem criar e usar o Apache Airflow Job (visualização)".

    2. Selecione Aplicar. Captura de tela para habilitar o Apache Airflow no locatário.

  • Crie a entidade de serviço. Adicione a entidade de serviço como a Contributor entidade de trabalho onde você cria o data warehouse.

  • Se você não tiver um, crie um armazém de malha. Ingerir os dados de amostra no armazém usando o pipeline de dados. Para este tutorial, usamos o exemplo NYC Taxi-Green .

  • Crie o "Apache Airflow Job" no espaço de trabalho.

Transforme os dados armazenados no armazém de malha usando dbt

Esta seção orienta você pelas seguintes etapas:

  1. Especifique os requisitos.
  2. Crie um projeto dbt no armazenamento gerenciado de malha fornecido pelo trabalho Apache Airflow..
  3. Criar um DAG Apache Airflow para orquestrar trabalhos dbt

Especificar os requisitos

Crie um arquivo requirements.txt na dags pasta. Adicione os seguintes pacotes como requisitos do Apache Airflow.

  • astrônomo-cosmos: Este pacote é usado para executar seus projetos dbt core como dags Apache Airflow e grupos de tarefas.

  • dbt-fabric: este pacote é usado para criar um projeto dbt, que pode ser implantado em um Data Warehouse de malha

       astronomer-cosmos==1.0.3
       dbt-fabric==1.5.0
    

Crie um projeto dbt no armazenamento gerenciado por malha fornecido pelo trabalho Apache Airflow.

  1. Nesta seção, criamos um projeto dbt de exemplo no Apache Airflow Job para o conjunto nyc_taxi_green de dados com a seguinte estrutura de diretórios.

       dags
       |-- my_cosmos_dag.py
       |-- nyc_taxi_green
       |  |-- profiles.yml
       |  |-- dbt_project.yml
       |  |-- models
       |  |   |-- nyc_trip_count.sql
       |  |-- target
    
  2. Crie a pasta nomeada nyc_taxi_green na pasta com profiles.yml arquivodags. Esta pasta contém todos os arquivos necessários para o projeto dbt. A captura de tela mostra a criação de arquivos para o projeto dbt.

  3. Copie o seguinte conteúdo para o profiles.yml. Este arquivo de configuração contém detalhes de conexão de banco de dados e perfis usados pelo dbt. Atualize os valores de espaço reservado e salve o arquivo.

    config:
      partial_parse: true
    nyc_taxi_green:
      target: fabric-dev
      outputs:
        fabric-dev:
          type: fabric
          driver: "ODBC Driver 18 for SQL Server"
          server: <sql connection string of your data warehouse>
          port: 1433
          database: "<name of the database>"
          schema: dbo
          threads: 4
          authentication: ServicePrincipal
          tenant_id: <Tenant ID of your service principal>
          client_id: <Client ID of your service principal>
          client_secret: <Client Secret of your service principal>
    
  4. Crie o dbt_project.yml arquivo e copie o conteúdo a seguir. Este arquivo especifica a configuração no nível do projeto.

    name: "nyc_taxi_green"
    
    config-version: 2
    version: "0.1"
    
    profile: "nyc_taxi_green"
    
    model-paths: ["models"]
    seed-paths: ["seeds"]
    test-paths: ["tests"]
    analysis-paths: ["analysis"]
    macro-paths: ["macros"]
    
    target-path: "target"
    clean-targets:
      - "target"
      - "dbt_modules"
      - "logs"
    
    require-dbt-version: [">=1.0.0", "<2.0.0"]
    
    models:
      nyc_taxi_green:
        materialized: table
    
  5. Crie a models pasta na nyc_taxi_green pasta. Para este tutorial, criamos o modelo de exemplo no arquivo chamado nyc_trip_count.sql que cria a tabela que mostra o número de viagens por dia por fornecedor. Copie o seguinte conteúdo no arquivo.

       with new_york_taxis as (
           select * from nyctlc
       ),
       final as (
         SELECT
           vendorID,
           CAST(lpepPickupDatetime AS DATE) AS trip_date,
           COUNT(*) AS trip_count
         FROM
             [contoso-data-warehouse].[dbo].[nyctlc]
         GROUP BY
             vendorID,
             CAST(lpepPickupDatetime AS DATE)
         ORDER BY
             vendorID,
             trip_date;
       )
       select * from final
    

    A captura de tela mostra modelos para o projeto dbt.

Criar um DAG Apache Airflow para orquestrar trabalhos dbt

  • Crie o arquivo nomeado my_cosmos_dag.py na dags pasta e cole o seguinte conteúdo nele.

    import os
    from pathlib import Path
    from datetime import datetime
    from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
    
    DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green"
    DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
    profile_config = ProfileConfig(
         profile_name="nyc_taxi_green",
         target_name="fabric-dev",
         profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml",
    )
    
    dbt_fabric_dag = DbtDag(
         project_config=ProjectConfig(DBT_ROOT_PATH,),
         operator_args={"install_deps": True},
         profile_config=profile_config,
         schedule_interval="@daily",
         start_date=datetime(2023, 9, 10),
         catchup=False,
         dag_id="dbt_fabric_dag",
    )
    

Execute seu DAG

  1. Execute o DAG no Apache Airflow Job. A captura de tela mostra executar dag.

  2. Para ver seu dag carregado na interface do usuário do Apache Airflow, clique em Monitor in Apache Airflow.Screenshot mostra como monitorar dbt dag.A captura de tela mostra a execução bem-sucedida do dag.

Valide os seus dados

  • Após uma execução bem-sucedida, para validar seus dados, você pode ver a nova tabela chamada 'nyc_trip_count.sql' criada em seu data warehouse de malha. A captura de tela mostra o dag dbt bem-sucedido.

Guia de início rápido: criar um trabalho do Apache Airflow