共用方式為


使用 Python 開發管線程式代碼

Delta Live Tables 引進數個新的 Python 程式代碼建構,用於定義管線中具體化檢視和串流數據表。 Python 支援開發管線是以 PySpark DataFrame 和結構化串流 API 的基本概念為基礎。

對於不熟悉 Python 和 DataFrame 的使用者,Databricks 建議使用 SQL 介面。 請參閱 使用 SQL 開發管線程式代碼。

如需 Delta Live Tables Python 語法的完整參考,請參閱 Delta Live Tables Python 語言參考

適用於管線開發的 Python 基本概念

建立 Delta Live Tables 資料集的 Python 程式碼必須傳回 DataFrame。

所有 Delta Live Tables Python API 都會在 dlt 模組中實作。 使用 Python 實作的 Delta Live Tables 管線程式碼必須明確地匯 dlt 入 Python 筆記本和檔案頂端的模組。

Delta Live Tables 特定的 Python 程式代碼與其他類型的 Python 程式代碼有一個重要差異:Python 管線程序代碼不會直接呼叫執行數據擷取和轉換的函式,以建立 Delta Live Tables 數據集。 相反地,Delta Live Tables 會解 dlt 譯管線中所有原始程式碼檔案中模組的裝飾函式,並建置數據流圖形。

重要

若要避免管線執行時發生非預期的行為,請勿在定義資料集的函式中包含可能有副作用的程式碼。 若要深入了解,請參閱 Python 參考

使用 Python 建立具體化檢視或串流數據表

裝飾 @dlt.table 專案會告知 Delta Live Tables 根據函式傳回的結果來建立具體化檢視或串流數據表。 批次讀取的結果會建立具體化檢視,而串流讀取的結果會建立串流數據表。

根據預設,具體化檢視和串流數據表名稱會從函式名稱推斷。 下列程式代碼範例示範建立具體化檢視表和串流數據表的基本語法:

注意

這兩個函式都會參考目錄中的 samples 相同數據表,並使用相同的裝飾函式。 這些範例強調具體化檢視和串流資料表的基本語法唯一差異是使用 spark.readspark.readStream

並非所有數據源都支援串流讀取。 某些數據源應該一律使用串流語意來處理。

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

或者,您可以使用裝飾專案中的 name @dlt.table 自變數來指定資料表名稱。 下列範例示範具體化檢視表和串流數據表的這個模式:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

從物件記憶體載入資料

Delta Live Tables 支援從 Azure Databricks 支援的所有格式載入資料。 請參閱資料格式選項 (機器翻譯)。

注意

這些範例會使用自動掛接至工作區底下 /databricks-datasets 可用的數據。 Databricks 建議使用磁碟區路徑或雲端 URI 來參考儲存在雲端物件記憶體中的數據。 請參閱Unity Catalog 磁碟區是什麼?

Databricks 建議針對儲存在雲端物件記憶體中的數據設定累加擷取工作負載時,使用自動載入器和串流數據表。 請參閱 什麼是自動載入器?

下列範例會使用自動載入器從 JSON 檔案建立串流資料表:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

下列範例會使用批次語意來讀取 JSON 目錄,並建立具體化檢視:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

使用預期驗證數據

您可以使用預期來設定及強制執行資料品質條件約束。 請參閱使用 Delta Live Tables 管理資料品質 (機器翻譯)。

下列程式代碼會使用 @dlt.expect_or_drop 來定義名為 valid_data 的預期,以在數據擷取期間卸除 Null 的記錄:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

查詢管線中定義的具體化檢視和串流數據表

LIVE使用架構來查詢管線中定義的其他具體化檢視和串流數據表。

下列範例會定義四個資料集:

  • 名為 orders 的串流數據表,會載入 JSON 數據。
  • 名為 customers 的具體化檢視,會載入 CSV 數據。
  • 名為 customer_orders 的具體化檢視,會聯結 和 customers 數據集中的orders記錄、將順序時間戳轉換成日期,然後選取 customer_idorder_numberstateorder_date 字段。
  • daily_orders_by_state 名的具體化檢視,會匯總每個狀態的每日訂單計數。
import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("LIVE.orders")
    .join(spark.read.table("LIVE.customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("LIVE.customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

在迴圈中建立 for 數據表

您可以使用 Python for 循環,以程式設計方式建立多個數據表。 當您有許多數據源或目標數據集只因少數參數而有所不同時,這可能會很有用,因此維護的總程序代碼較少,而且程式代碼備援較少。

迴圈會 for 依序列順序評估邏輯,但一旦規劃數據集完成之後,管線會平行執行邏輯。

重要

使用此模式來定義資料集時,請確定傳遞至 for 迴圈的值清單一律是加總的。 如果先前在管線中定義的數據集從未來的管線執行中省略,該數據集會自動從目標架構卸除。

下列範例會建立五個數據表,依區域篩選客戶訂單。 在這裡,區域名稱是用來設定目標具體化檢視的名稱,以及篩選源數據。 暫存檢視可用來定義建構最終具體化檢視時所用源數據表的聯結。

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("LIVE.customer_orders")
    nation_region = spark.read.table("LIVE.nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

以下是此管線數據流圖形的範例:

導致五個區域數據表之兩個檢視的數據流圖表。

疑難解答: for 迴圈會建立許多具有相同值的數據表

管線用來評估 Python 程式代碼的延遲執行模型,您的邏輯會在叫用 所裝飾 @dlt.table() 的函式時直接參考個別值。

下列範例示範兩個使用 迴圈定義數據表 for 的正確方法。 在這兩個範例中 tables ,清單中的每個數據表名稱都會在 裝飾 @dlt.table()的函式內明確參考。

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

下列範例 正確參考值。 此範例會建立具有不同名稱的數據表,但所有數據表都會從迴圈中的 for 最後一個值載入數據:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)