共用方式為


使用 Python 開發管線程式代碼

DLT 引進數個新的 Python 程式代碼建構,用於定義管線中具體化檢視和串流數據表。 Python 支援開發管線是以 PySpark DataFrame 和結構化串流 API 的基本概念為基礎。

對於不熟悉 Python 和 DataFrame 的使用者,Databricks 建議使用 SQL 介面。 請參閱 使用 SQL開發管線程式代碼。

如需 DLT Python 語法的完整參考,請參閱 DLT Python 語言參考

適用於管線開發的 Python 基本概念

建立 DLT 數據集的 Python 程式代碼必須傳回 DataFrame。

所有 DLT Python API 都會在 dlt 模組中實作。 使用 Python 實作的 DLT 管線程式代碼必須明確地匯入 Python 筆記本和檔案頂端的 dlt 模組。

讀取和寫入預設為管線組態期間指定的目錄和架構。 請參閱 設定目標目錄和架構

DLT 特定的 Python 程式代碼與其他類型的 Python 程式代碼有一個重要差異:Python 管線程式代碼不會直接呼叫執行數據擷取和轉換的函式來建立 DLT 數據集。 相反地,DLT 會解譯管線中所有原始程式碼檔案中 dlt 模組的裝飾函式,並建置數據流圖形。

重要

若要避免管線執行時發生非預期的行為,請勿在定義數據集的函式中包含可能有副作用的程序代碼。 若要深入瞭解,請參閱 Python 參考

使用 Python 建立具體化檢視或串流數據表

@dlt.table 裝飾器指示 DLT 根據函式傳回的結果建立具象化視圖或串流資料表。 批次讀取的結果會建立具體化檢視,而串流讀取的結果會建立串流數據表。

根據預設,具體化檢視和串流數據表名稱會從函式名稱推斷。 下列程式代碼範例示範建立具體化檢視表和串流數據表的基本語法:

注意

這兩個函式都會參考 samples 目錄中的相同數據表,並使用相同的裝飾函式。 這些範例強調具體化檢視和串流數據表的基本語法唯一差異是使用 spark.readspark.readStream

並非所有數據源都支援串流讀取。 某些數據源應該一律使用串流語意來處理。

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

或者,您可以使用 @dlt.table 裝飾專案中的 name 自變數來指定資料表名稱。 下列範例示範具體化檢視表和串流數據表的這個模式:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

從物件記憶體載入資料

DLT 支援從 Azure Databricks 支援的所有格式載入數據。 請參閱 數據格式選項。

注意

這些範例會使用自動掛接至工作區 /databricks-datasets 下可用的數據。 Databricks 建議使用磁碟區路徑或雲端 URI 來參考儲存在雲端物件記憶體中的數據。 請參閱 什麼是 Unity 目錄磁碟區?

Databricks 建議在設定針對儲存在雲端物件儲存中的資料進行增量擷取的工作負載時,使用 Auto Loader 和串流資料表。 請參閱 什麼是自動載入器?

下列範例會使用自動載入器從 JSON 檔案建立串流資料表:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

下列範例會使用批次語意來讀取 JSON 目錄,並建立具體化檢視:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

用預期驗證數據

您可以使用預期來設定及強制執行資料品質條件約束。 請參閱 使用期望值管理管線中的資料品質

下列程式代碼會使用 @dlt.expect_or_drop 來定義名為 valid_data 的預期,以在數據擷取期間卸除 null 的記錄:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

查詢管線中定義的具體化檢視和串流數據表

下列範例會定義四個資料集:

  • 名為 orders 的串流數據表,會載入 JSON 數據。
  • 名為 customers 的具體化檢視,可載入 CSV 數據。
  • 名為 customer_orders 的具體化檢視,聯結來自 orderscustomers 數據集的記錄、將順序時間戳轉換成日期,然後選取 customer_idorder_numberstateorder_date 字段。
  • 名為 daily_orders_by_state 的具體化檢視,會匯總每個州的每日訂單計數。

注意

在查詢管線中的檢視或資料表時,您可以直接指定目錄和架構,也可以使用管線中設定的預設值。 在此範例中,orderscustomerscustomer_orders 數據表會從您管線設定的預設目錄和架構中寫入和讀取。

舊版發佈模式會使用 LIVE 架構來查詢管線中定義的其他具體化檢視和串流數據表。 在新管線中,LIVE 架構語法會被默默地忽略。 請參閱 LIVE 架構 (舊版)

import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

for 迴圈中建立數據表

您可以使用 Python for 循環,以程式設計方式建立多個數據表。 當您有許多數據源或目標數據集只因少數參數而有所不同時,這可能會很有用,這導致維護所需的總程式碼較少,而且程式碼冗贅較少。

for 迴圈會依序列順序評估邏輯,但一旦為數據集完成規劃,管線就會平行執行邏輯。

重要

使用此模式來定義資料集時,請確定傳遞至 for 迴圈的值清單一律是加總的。 如果先前在管線中定義的資料集於未來的管線運行中被省略,該資料集會自動從目標結構中移除。

下列範例會建立五個數據表,依區域篩選客戶訂單。 在這裡,區域名稱是用來設定目標具體化檢視的名稱,以及篩選源數據。 暫存檢視可用來定義建構最終具體化檢視時所用源數據表的聯結。

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

以下是此管線數據流圖形的範例:

兩個檢視引導進五個區域數據表的數據流圖表。

疑難解答:for 迴圈會建立許多具有相同值的數據表

執行管線用來評估 Python 程式碼的延遲執行模型,需要在調用經 @dlt.table() 裝飾的函數時,您的邏輯必須直接參考個別值。

下列範例示範兩個正確的方法來定義具有 for 循環的數據表。 在這兩個範例中,tables 清單中的每個資料表名稱都會在由 @dlt.table()裝飾的函式中被明確引用。

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

下列範例 未正確引用 的值。 此範例會建立具有不同名稱的數據表,但所有數據表都會從 for 迴圈中的最後一個值載入數據:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)