แปลงข้อมูลโดยใช้ dbt
หมายเหตุ
งานกระแสอากาศ Apache ขับเคลื่อนด้วย กระแสอากาศ Apache
dbt(Data Build Tool) เป็นอินเทอร์เฟซบรรทัดคําสั่งแบบโอเพนซอร์ส (CLI) ที่ทําให้การแปลงข้อมูลและการสร้างแบบจําลองภายในคลังข้อมูลง่ายขึ้นด้วยการจัดการรหัส SQL ที่ซับซ้อนด้วยวิธีที่มีโครงสร้างและดูแลรักษาได้ ซึ่งช่วยให้ทีมข้อมูลสามารถสร้างการแปลงที่เชื่อถือได้และทดสอบได้ที่แกนหลักของไปป์ไลน์การวิเคราะห์ของพวกเขา
เมื่อจับคู่กับกระแสอากาศ Apache ความสามารถในการแปลงข้อมูลของ dbt ได้รับการปรับปรุงโดยการกําหนดเวลา การจัดเรียง และการจัดการงานของ Airflow วิธีการที่รวมกันนี้ การใช้ความเชี่ยวชาญในการแปลงข้อมูลของ dbt ควบคู่ไปกับการจัดการเวิร์กโฟลว์ของ Airflow ส่งมอบไปป์ไลน์ข้อมูลที่มีประสิทธิภาพและแข็งแกร่ง ในท้ายที่สุดนําไปสู่การตัดสินใจที่มีข้อมูลที่มีข้อมูลเชิงลึกมากขึ้นและรวดเร็วขึ้น
บทช่วยสอนนี้แสดงวิธีการสร้าง Apache Airflow DAG ที่ใช้ dbt เพื่อแปลงข้อมูลที่จัดเก็บไว้ใน Microsoft Fabric Data Warehouse
ข้อกำหนดเบื้องต้น
เมื่อต้องการเริ่มต้นใช้งาน คุณต้องดําเนินการข้อกําหนดเบื้องต้นต่อไปนี้ให้เสร็จสมบูรณ์:
เปิดใช้งาน Apache Airflow Job ในผู้เช่าของคุณ
หมายเหตุ
เนื่องจากงานกระแสอากาศ Apache อยู่ในสถานะแสดงตัวอย่าง คุณต้องเปิดใช้งานผ่านผู้ดูแลระบบผู้เช่าของคุณ ถ้าคุณเห็นงานกระแสอากาศ Apache ผู้ดูแลระบบผู้เช่าของคุณอาจเปิดใช้งานอยู่แล้ว
สร้างบริการหลัก เพิ่มบริการหลักเป็น
Contributor
ในพื้นที่ทํางานที่คุณสร้างคลังข้อมูลถ้าคุณไม่มี ให้สร้าง Fabric Warehouse นําเข้าข้อมูลตัวอย่างลงในคลังสินค้าโดยใช้ไปป์ไลน์ข้อมูล สําหรับบทช่วยสอนนี้ เราใช้ตัวอย่างรถแท็กซี่-สีเขียวของ NYC
แปลงข้อมูลที่จัดเก็บไว้ใน Fabric Warehouse โดยใช้ dbt
ส่วนนี้แนะนําคุณผ่านขั้นตอนต่อไปนี้:
- ระบุข้อกําหนด
- สร้างโครงการ dbt ในที่เก็บข้อมูลที่จัดการโดย Fabric ซึ่งจัดหาโดยงานกระแสอากาศ Apache
- สร้าง Apache Airflow DAG เพื่อจัดเรียงงาน dbt
ระบุข้อกําหนด
สร้างไฟล์ requirements.txt
ใน dags
โฟลเดอร์ เพิ่มแพคเกจต่อไปนี้ตามความต้องการของกระแสอากาศ Apache
astronomer-cosmos: แพคเกจนี้ถูกใช้เพื่อเรียกใช้โครงการหลักของ dbt ของคุณเป็น Apache Airflow dags และกลุ่มงาน
dbt-fabric: แพคเกจนี้ถูกใช้เพื่อสร้างโครงการ dbt ซึ่งสามารถปรับใช้กับ Fabric Data Warehouse
astronomer-cosmos==1.0.3 dbt-fabric==1.5.0
สร้างโครงการ dbt ในที่เก็บข้อมูลที่จัดการโดย Fabric ซึ่งจัดหาโดยงานกระแสอากาศ Apache
ในส่วนนี้ เราจะสร้างโครงการ dbt ตัวอย่างในงานกระแสอากาศ Apache สําหรับชุดข้อมูล
nyc_taxi_green
ที่มีโครงสร้างไดเรกทอรีต่อไปนี้dags |-- my_cosmos_dag.py |-- nyc_taxi_green | |-- profiles.yml | |-- dbt_project.yml | |-- models | | |-- nyc_trip_count.sql | |-- target
สร้างโฟลเดอร์ที่ชื่อว่า
nyc_taxi_green
ในdags
โฟลเดอร์ ที่มีprofiles.yml
ไฟล์ โฟลเดอร์นี้ประกอบด้วยไฟล์ทั้งหมดที่จําเป็นสําหรับโครงการ dbtคัดลอกเนื้อหาต่อไปนี้ลงใน
profiles.yml
ไฟล์การกําหนดค่านี้ประกอบด้วยรายละเอียดการเชื่อมต่อฐานข้อมูลและโปรไฟล์ที่ใช้โดย dbt อัปเดตค่าตัวแทนข้อความและบันทึกไฟล์config: partial_parse: true nyc_taxi_green: target: fabric-dev outputs: fabric-dev: type: fabric driver: "ODBC Driver 18 for SQL Server" server: <sql connection string of your data warehouse> port: 1433 database: "<name of the database>" schema: dbo threads: 4 authentication: ServicePrincipal tenant_id: <Tenant ID of your service principal> client_id: <Client ID of your service principal> client_secret: <Client Secret of your service principal>
dbt_project.yml
สร้างไฟล์และคัดลอกเนื้อหาต่อไปนี้ ไฟล์นี้ระบุการกําหนดค่าระดับโครงการname: "nyc_taxi_green" config-version: 2 version: "0.1" profile: "nyc_taxi_green" model-paths: ["models"] seed-paths: ["seeds"] test-paths: ["tests"] analysis-paths: ["analysis"] macro-paths: ["macros"] target-path: "target" clean-targets: - "target" - "dbt_modules" - "logs" require-dbt-version: [">=1.0.0", "<2.0.0"] models: nyc_taxi_green: materialized: table
models
สร้างโฟลเดอร์ในnyc_taxi_green
โฟลเดอร์ สําหรับบทช่วยสอนนี้ เราสร้างแบบจําลองตัวอย่างในไฟล์ที่มีnyc_trip_count.sql
ชื่อว่า สร้างตารางที่แสดงจํานวนการเดินทางต่อวันต่อผู้ขาย คัดลอกเนื้อหาต่อไปนี้ในไฟล์with new_york_taxis as ( select * from nyctlc ), final as ( SELECT vendorID, CAST(lpepPickupDatetime AS DATE) AS trip_date, COUNT(*) AS trip_count FROM [contoso-data-warehouse].[dbo].[nyctlc] GROUP BY vendorID, CAST(lpepPickupDatetime AS DATE) ORDER BY vendorID, trip_date; ) select * from final
สร้าง Apache Airflow DAG เพื่อจัดเรียงงาน dbt
สร้างไฟล์ที่มีชื่อว่า
my_cosmos_dag.py
ในโฟลเดอร์ และวางเนื้อหาต่อไปนี้ในdags
นั้นimport os from pathlib import Path from datetime import datetime from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig DEFAULT_DBT_ROOT_PATH = Path(__file__).parent.parent / "dags" / "nyc_taxi_green" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) profile_config = ProfileConfig( profile_name="nyc_taxi_green", target_name="fabric-dev", profiles_yml_filepath=DBT_ROOT_PATH / "profiles.yml", ) dbt_fabric_dag = DbtDag( project_config=ProjectConfig(DBT_ROOT_PATH,), operator_args={"install_deps": True}, profile_config=profile_config, schedule_interval="@daily", start_date=datetime(2023, 9, 10), catchup=False, dag_id="dbt_fabric_dag", )
เรียกใช้ DAG ของคุณ
ตรวจสอบข้อมูลของคุณ
- หลังจากเรียกใช้สําเร็จ เพื่อตรวจสอบข้อมูลของคุณ คุณสามารถดูตารางใหม่ที่ชื่อ 'nyc_trip_count.sql' ที่สร้างขึ้นในคลังข้อมูล Fabric ของคุณ