Udostępnij za pośrednictwem


Przekształcanie danych przy użyciu dbt

Uwaga

Zadanie apache Airflow jest obsługiwane przez platformę Apache Airflow.

dbt(Data Build Tool) to interfejs wiersza polecenia typu open source, który upraszcza przekształcanie i modelowanie danych w magazynach danych, zarządzając złożonym kodem SQL w ustrukturyzowany, konserwowalny sposób. Umożliwia zespołom danych tworzenie niezawodnych, testowalnych przekształceń w rdzeniu potoków analitycznych.

W połączeniu z platformą Apache Airflow funkcje przekształcania dbt są ulepszane przez funkcje planowania, orkiestracji i zarządzania zadaniami firmy Airflow. To połączone podejście, wykorzystując wiedzę na temat transformacji dbt wraz z zarządzaniem przepływem pracy firmy Airflow, zapewnia wydajne i niezawodne potoki danych, co ostatecznie prowadzi do szybszych i bardziej szczegółowych decyzji opartych na danych.

W tym samouczku pokazano, jak utworzyć grupę DAG platformy Apache Airflow, która używa bazy danych do przekształcania danych przechowywanych w magazynie danych usługi Microsoft Fabric.

Wymagania wstępne

Aby rozpocząć pracę, należy spełnić następujące wymagania wstępne:

  • Włącz zadanie platformy Apache Airflow w dzierżawie.

    Uwaga

    Ponieważ zadanie platformy Apache Airflow jest w wersji zapoznawczej, należy włączyć je za pośrednictwem administratora dzierżawy. Jeśli widzisz już zadanie platformy Apache Airflow, administrator dzierżawy mógł go już włączyć.

    1. Przejdź do pozycji Portal administracyjny —> Ustawienia dzierżawy —> w obszarze Microsoft Fabric —> rozwiń sekcję "Użytkownicy mogą tworzyć zadania platformy Apache Airflow (wersja zapoznawcza)".

    2. Wybierz Zastosuj. Zrzut ekranu przedstawiający włączanie platformy Apache Airflow w dzierżawie.

  • Utwórz jednostkę usługi. Dodaj jednostkę usługi jako element Contributor w obszarze roboczym, w którym tworzysz magazyn danych.

  • Jeśli go nie masz, utwórz magazyn sieci szkieletowej. Pozyskiwanie przykładowych danych do magazynu przy użyciu potoku danych. Na potrzeby tego samouczka użyjemy przykładu NYC Taxi-Green .

  • Utwórz zadanie "Apache Airflow" w obszarze roboczym.

Przekształcanie danych przechowywanych w magazynie sieci szkieletowej przy użyciu bazy danych dbt

W tej sekcji przedstawiono następujące kroki:

  1. Określ wymagania.
  2. Utwórz projekt dbt w magazynie zarządzanym sieci szkieletowej udostępnionym przez zadanie Apache Airflow.
  3. Tworzenie grupy DAG systemu Apache Airflow w celu organizowania zadań dbt

Określanie wymagań

Utwórz plik requirements.txt w folderze dags . Dodaj następujące pakiety jako wymagania dotyczące platformy Apache Airflow.

  • astronom-cosmos: Ten pakiet służy do uruchamiania podstawowych projektów dbt jako dags apache Airflow i grup zadań.

  • dbt-fabric: ten pakiet służy do tworzenia projektu dbt, który następnie można wdrożyć w magazynie danych sieci szkieletowej

       astronomer-cosmos==1.0.3
       dbt-fabric==1.5.0
    

Utwórz projekt dbt w magazynie zarządzanym sieci szkieletowej udostępnionym przez zadanie Apache Airflow.

  1. W tej sekcji utworzymy przykładowy projekt dbt w zadaniu platformy Apache Airflow dla zestawu danych nyc_taxi_green z następującą strukturą katalogu.

       dags
       |-- my_cosmos_dag.py
       |-- nyc_taxi_green
       |  |-- profiles.yml
       |  |-- dbt_project.yml
       |  |-- models
       |  |   |-- nyc_trip_count.sql
       |  |-- target
    
  2. Utwórz folder o nazwie nyc_taxi_green w folderze dags z plikiem profiles.yml . Ten folder zawiera wszystkie pliki wymagane dla projektu dbt. Zrzut ekranu przedstawiający tworzenie plików dla projektu dbt.

  3. Skopiuj następującą zawartość do pliku profiles.yml. Ten plik konfiguracji zawiera szczegóły połączenia bazy danych i profile używane przez bazę danych dbt. Zaktualizuj wartości symboli zastępczych i zapisz plik.

    config:
      partial_parse: true
    nyc_taxi_green:
      target: fabric-dev
      outputs:
        fabric-dev:
          type: fabric
          driver: "ODBC Driver 18 for SQL Server"
          server: <sql connection string of your data warehouse>
          port: 1433
          database: "<name of the database>"
          schema: dbo
          threads: 4
          authentication: ServicePrincipal
          tenant_id: <Tenant ID of your service principal>
          client_id: <Client ID of your service principal>
          client_secret: <Client Secret of your service principal>
    
  4. dbt_project.yml Utwórz plik i skopiuj następującą zawartość. Ten plik określa konfigurację na poziomie projektu.

    name: "nyc_taxi_green"
    
    config-version: 2
    version: "0.1"
    
    profile: "nyc_taxi_green"
    
    model-paths: ["models"]
    seed-paths: ["seeds"]
    test-paths: ["tests"]
    analysis-paths: ["analysis"]
    macro-paths: ["macros"]
    
    target-path: "target"
    clean-targets:
      - "target"
      - "dbt_modules"
      - "logs"
    
    require-dbt-version: [">=1.0.0", "<2.0.0"]
    
    models:
      nyc_taxi_green:
        materialized: table
    
  5. models Utwórz folder w folderze nyc_taxi_green . Na potrzeby tego samouczka utworzymy przykładowy model w pliku o nazwie nyc_trip_count.sql , który tworzy tabelę zawierającą liczbę podróży dziennie na dostawcę. Skopiuj następującą zawartość w pliku.

       with new_york_taxis as (
           select * from nyctlc
       ),
       final as (
         SELECT
           vendorID,
           CAST(lpepPickupDatetime AS DATE) AS trip_date,
           COUNT(*) AS trip_count
         FROM
             [contoso-data-warehouse].[dbo].[nyctlc]
         GROUP BY
             vendorID,
             CAST(lpepPickupDatetime AS DATE)
         ORDER BY
             vendorID,
             trip_date;
       )
       select * from final
    

    Zrzut ekranu przedstawia modele projektu dbt.

Tworzenie grupy DAG systemu Apache Airflow w celu organizowania zadań dbt

  • Utwórz plik o nazwie my_cosmos_dag.py w dags folderze i wklej w nim następującą zawartość.

    import os
    from pathlib import Path
    from datetime import datetime
    from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
    
    DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green"
    DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
    profile_config = ProfileConfig(
         profile_name="nyc_taxi_green",
         target_name="fabric-dev",
         profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml",
    )
    
    dbt_fabric_dag = DbtDag(
         project_config=ProjectConfig(DBT_ROOT_PATH,),
         operator_args={"install_deps": True},
         profile_config=profile_config,
         schedule_interval="@daily",
         start_date=datetime(2023, 9, 10),
         catchup=False,
         dag_id="dbt_fabric_dag",
    )
    

Uruchamianie grupy DAG

  1. Uruchom grupę DAG w ramach zadania przepływu powietrza apache. Zrzut ekranu przedstawiający uruchamianie narzędzia dag.

  2. Aby wyświetlić plik dag załadowany w interfejsie użytkownika platformy Apache Airflow, kliknij pozycję Monitor in Apache Airflow.Zrzut ekranu przedstawiający sposób monitorowania bazy danych dbt dag.Zrzut ekranu przedstawia pomyślne uruchomienie narzędzia dag.

Weryfikowanie danych

  • Po pomyślnym uruchomieniu, aby zweryfikować dane, możesz zobaczyć nową tabelę o nazwie "nyc_trip_count.sql" utworzoną w magazynie danych usługi Fabric. Zrzut ekranu przedstawiający pomyślne dag dbt.

Szybki start: tworzenie zadania platformy Apache Airflow