Κοινή χρήση μέσω


Μετασχηματισμός δεδομένων με χρήση dbt

Σημείωμα

Η εργασία Airflow Apache υποστηρίζεται από το Apache Airflow.

Το dbt(Data Build Tool) είναι μια διασύνδεση γραμμής εντολών ανοιχτού κώδικα (CLI) που απλοποιεί τον μετασχηματισμό δεδομένων και τη μοντελοποίηση εντός αποθηκών δεδομένων, με τη διαχείριση σύνθετου κώδικα SQL με δομημένο και διατηρήσιμο τρόπο. Επιτρέπει στις ομάδες δεδομένων να δημιουργούν αξιόπιστους μετασχηματισμούς με δυνατότητα δοκιμής στον πυρήνα των διοχετεύσεων ανάλυσης.

Σε συνδυασμό με την airflow Apache, οι δυνατότητες μετασχηματισμού της βάσης βελτιώνονται με τις δυνατότητες προγραμματισμού, ενορχήστρωσης και διαχείρισης εργασιών της Airflow. Αυτή η συνδυασμένη προσέγγιση, η χρήση της εξειδίκευσης μετασχηματισμού της dbt παράλληλα με τη διαχείριση ροών εργασιών της Airflow, παρέχει αποτελεσματικές και ισχυρές διοχετεύσεις δεδομένων, οδηγώντας τελικά σε ταχύτερες και πιο ενημερωμένες αποφάσεις βάσει δεδομένων.

Αυτή η εκμάθηση δείχνει πώς μπορείτε να δημιουργήσετε ένα DAG Airflow Apache που χρησιμοποιεί βάση δεδομένων για τον μετασχηματισμό δεδομένων που είναι αποθηκευμένα στην Αποθήκη δεδομένων Microsoft Fabric.

Προαπαιτούμενα στοιχεία

Για να ξεκινήσετε, πρέπει να συμπληρώσετε τις ακόλουθες προϋποθέσεις:

  • Ενεργοποιήστε την εργασία Airflow Apache στον μισθωτή σας.

    Σημείωμα

    Δεδομένου ότι η εργασία Airflow Apache βρίσκεται σε κατάσταση προεπισκόπησης, πρέπει να την ενεργοποιήσετε μέσω του διαχειριστή μισθωτή σας. Εάν βλέπετε ήδη την Εργασία airflow Apache, ο διαχειριστής μισθωτή σας μπορεί να την έχει ήδη ενεργοποιήσει.

    1. Μεταβείτε στην Πύλη διαχείρισης -> Ρυθμίσεις μισθωτή -> Στην περιοχή Microsoft Fabric -> Αναπτύξτε την ενότητα "Οι χρήστες μπορούν να δημιουργήσουν και να χρησιμοποιήσουν την εργασία ροής αέρα Apache (προεπισκόπηση)".

    2. Επιλέξτε Εφαρμογή. Στιγμιότυπο οθόνης για την ενεργοποίηση της ροής αέρα Apache στον μισθωτή.

  • Δημιουργήστε την κύρια υπηρεσία. Προσθέστε την κύρια υπηρεσία ως τον Contributor χώρο εργασίας όπου δημιουργείτε μια αποθήκη δεδομένων.

  • Εάν δεν έχετε, δημιουργήστε μια αποθήκη Fabric. Πρόσληψη του δείγματος δεδομένων στην αποθήκη με χρήση διοχέτευσης δεδομένων. Για αυτό το εκπαιδευτικό βοήθημα, χρησιμοποιούμε το δείγμα NYC Taxi-Green .

  • Δημιουργήστε την "Εργασία ροής αέρα Apache" στον χώρο εργασίας.

Μετασχηματισμός των δεδομένων που είναι αποθηκευμένα στην αποθήκη Fabric με χρήση dbt

Αυτή η ενότητα σάς καθοδηγεί στα παρακάτω βήματα:

  1. Καθορίστε τις απαιτήσεις.
  2. Δημιουργήστε ένα έργο βάσης δεδομένων στον διαχειριζόμενο χώρο αποθήκευσης Fabric που παρέχεται από την εργασία Airflow Apache.
  3. Δημιουργία DAG airflow Apache για την ενορχήστρωση εργασιών dbt

Καθορισμός των απαιτήσεων

Δημιουργήστε ένα αρχείο requirements.txt στον dags φάκελο. Προσθέστε τα παρακάτω πακέτα ως απαιτήσεις της ροής αέρα Apache.

  • astronomer-cosmos: Αυτό το πακέτο χρησιμοποιείται για την εκτέλεση των έργων του πυρήνα της βάσης σας ως νταγκ και ομάδες εργασιών του Apache Airflow.

  • dbt-fabric: Αυτό το πακέτο χρησιμοποιείται για τη δημιουργία έργου dbt, το οποίο μπορεί έπειτα να αναπτυχθεί σε μια Αποθήκη δεδομένων Fabric

       astronomer-cosmos==1.0.3
       dbt-fabric==1.5.0
    

Δημιουργήστε ένα έργο βάσης δεδομένων στον διαχειριζόμενο χώρο αποθήκευσης Fabric που παρέχεται από την εργασία Airflow Apache.

  1. Σε αυτή την ενότητα, δημιουργούμε ένα δείγμα έργου dbt στην εργασία ροής αέρα Apache για το σύνολο nyc_taxi_green δεδομένων με την ακόλουθη δομή καταλόγου.

       dags
       |-- my_cosmos_dag.py
       |-- nyc_taxi_green
       |  |-- profiles.yml
       |  |-- dbt_project.yml
       |  |-- models
       |  |   |-- nyc_trip_count.sql
       |  |-- target
    
  2. Δημιουργήστε τον φάκελο με nyc_taxi_green ονομασία στον dags φάκελο με profiles.yml το αρχείο . Αυτός ο φάκελος περιέχει όλα τα αρχεία που απαιτούνται για το έργο dbt. Στιγμιότυπο οθόνης που εμφανίζει τη δημιουργία αρχείων για το έργο dbt.

  3. Αντιγράψτε τα παρακάτω περιεχόμενα στο profiles.yml. Αυτό το αρχείο ρύθμισης παραμέτρων περιέχει λεπτομέρειες σύνδεσης βάσης δεδομένων και προφίλ που χρησιμοποιούνται από τη βάση δεδομένων. Ενημερώστε τις τιμές κράτησης θέσης και αποθηκεύστε το αρχείο.

    config:
      partial_parse: true
    nyc_taxi_green:
      target: fabric-dev
      outputs:
        fabric-dev:
          type: fabric
          driver: "ODBC Driver 18 for SQL Server"
          server: <sql connection string of your data warehouse>
          port: 1433
          database: "<name of the database>"
          schema: dbo
          threads: 4
          authentication: ServicePrincipal
          tenant_id: <Tenant ID of your service principal>
          client_id: <Client ID of your service principal>
          client_secret: <Client Secret of your service principal>
    
  4. Δημιουργήστε το dbt_project.yml αρχείο και αντιγράψτε τα παρακάτω περιεχόμενα. Αυτό το αρχείο καθορίζει τη ρύθμιση παραμέτρων σε επίπεδο έργου.

    name: "nyc_taxi_green"
    
    config-version: 2
    version: "0.1"
    
    profile: "nyc_taxi_green"
    
    model-paths: ["models"]
    seed-paths: ["seeds"]
    test-paths: ["tests"]
    analysis-paths: ["analysis"]
    macro-paths: ["macros"]
    
    target-path: "target"
    clean-targets:
      - "target"
      - "dbt_modules"
      - "logs"
    
    require-dbt-version: [">=1.0.0", "<2.0.0"]
    
    models:
      nyc_taxi_green:
        materialized: table
    
  5. Δημιουργήστε τον models φάκελο στον nyc_taxi_green φάκελο. Για αυτό το εκπαιδευτικό βοήθημα, δημιουργούμε το δείγμα μοντέλου στο αρχείο με το όνομα nyc_trip_count.sql που δημιουργεί τον πίνακα που εμφανίζει τον αριθμό των ταξιδιών ανά ημέρα ανά προμηθευτή. Αντιγράψτε τα παρακάτω περιεχόμενα στο αρχείο.

       with new_york_taxis as (
           select * from nyctlc
       ),
       final as (
         SELECT
           vendorID,
           CAST(lpepPickupDatetime AS DATE) AS trip_date,
           COUNT(*) AS trip_count
         FROM
             [contoso-data-warehouse].[dbo].[nyctlc]
         GROUP BY
             vendorID,
             CAST(lpepPickupDatetime AS DATE)
         ORDER BY
             vendorID,
             trip_date;
       )
       select * from final
    

    Στιγμιότυπο οθόνης που εμφανίζει μοντέλα για το έργο dbt.

Δημιουργία DAG airflow Apache για την ενορχήστρωση εργασιών dbt

  • Δημιουργήστε το αρχείο με ονομασία my_cosmos_dag.py στον dags φάκελο και επικολλήστε τα παρακάτω περιεχόμενα σε αυτόν.

    import os
    from pathlib import Path
    from datetime import datetime
    from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
    
    DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green"
    DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
    profile_config = ProfileConfig(
         profile_name="nyc_taxi_green",
         target_name="fabric-dev",
         profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml",
    )
    
    dbt_fabric_dag = DbtDag(
         project_config=ProjectConfig(DBT_ROOT_PATH,),
         operator_args={"install_deps": True},
         profile_config=profile_config,
         schedule_interval="@daily",
         start_date=datetime(2023, 9, 10),
         catchup=False,
         dag_id="dbt_fabric_dag",
    )
    

Εκτέλεση του DAG σας

  1. Εκτελέστε το DAG στην εργασία ροής αέρα Apache. Στιγμιότυπο οθόνης που εμφανίζει την εκτέλεση dag.

  2. Για να δείτε το dag σας φορτωμένο στο περιβάλλον εργασίας χρήστη της ροής αέρα Apache, κάντε κλικ στο Monitor in Apache Airflow.Στιγμιότυπο οθόνης που εμφανίζει τον τρόπο παρακολούθησης dbt dbt dag.Στιγμιότυπο οθόνης που εμφανίζει την επιτυχημένη εκτέλεση dag.

Επικύρωση των δεδομένων σας

  • Μετά από μια επιτυχημένη εκτέλεση, για να επικυρώσετε τα δεδομένα σας, μπορείτε να δείτε τον νέο πίνακα με το όνομα "nyc_trip_count.sql" που δημιουργήθηκε στην αποθήκη δεδομένων Fabric σας. Στιγμιότυπο οθόνης που εμφανίζει επιτυχημένο dbt dag.

Γρήγορη εκκίνηση: Δημιουργία εργασίας ροής αέρα Apache