共用方式為


使用數據處理管線的期望來管理資料品質

使用預期來套用品質條件約束,以在數據流經 ETL 管線時驗證數據。 期望可讓您更深入地了解數據品質指標,進而在偵測到無效記錄時,允許您選擇不更新或刪除記錄。

本文提供預期的概觀,包括語法範例和行為選項。 如需更進階的使用案例和建議的最佳做法,請參閱 預期建議和進階模式

Delta Live Tables 預期流程圖

什麼是期望?

預期是管線具體化檢視、串流數據表或檢視建立語句中的選擇性子句,這些語句會在傳遞查詢的每個記錄上套用數據質量檢查。 預期會使用標準 SQL 布爾語句來指定條件約束。 您可以為單一數據集結合多個期望,並於管線中為所有數據集宣告設定期望。

下列各節介紹期望的三個元件,並提供語法範例。

預期名稱

每個期望都必須有一個名稱,此名稱會作為標識符來追蹤及監視預期。 選擇用來傳達所驗證計量的名稱。 下列範例會定義預期 valid_customer_age,以確認年齡在 0 到 120 歲之間:

重要

指定數據集的預期名稱必須是唯一的。 您可以在管線中的多個資料集之間重複使用預期。 請參閱 可攜式且可重複使用的期望

Python

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

評估的限制條件

每個記錄的約束子句是 SQL 條件語句,必須被評估為真或假。 條件約束包含正在驗證之專案的實際邏輯。 當記錄失敗此條件時,就會觸發預期。

約束必須使用有效的 SQL 語法,且不能包含下列內容:

  • 自定義 Python 函式
  • 外部服務呼叫
  • 參考其他數據表的子查詢

以下是可新增至資料集創建敘述的限制條件範例:

Python

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

無效記錄的處理

您必須指定動作,以判斷記錄失敗驗證檢查時會發生什麼情況。 下表描述可用的動作:

動作 SQL 語法 Python 語法 結果
warn (預設值) EXPECT dlt.expect 無效的記錄會寫入目標。 有效和無效的記錄計數會與其他數據集計量一起記錄。
卸除 EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop 在將數據寫入目標之前,會移除無效的記錄。 已移除的記錄計數會與其他數據集指標一起登錄。
fail EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail 無效的記錄可防止更新成功。 重新處理之前需要手動介入。 此預期會導致單一流程失敗,而且不會造成管線中的其他流程失敗。

您也可以實作高級邏輯來隔離無效的記錄,而不會失敗或丟棄數據。 請參閱 隔離無效的記錄

期望追蹤指標

您可以從管道介面查看 warndrop 動作的追蹤指標。 由於 fail 導致偵測到無效記錄時更新失敗,因此不會記錄計量。

若要檢視預期計量,請完成下列步驟:

  1. 點擊側邊欄中的 Delta Live Tables
  2. 點擊 管線 名稱。
  3. 點選具預期條件的資料集。
  4. 選取右側提要字段中的 [數據品質] 索引標籤。

您可以藉由查詢 Delta Live Tables 事件記錄檔來檢視數據質量計量。 請參閱從事件記錄檔查詢數據品質。

保留無效的記錄

保留無效的記錄是預期的預設行為。 在您想要保留違反預期的記錄,並且希望收集通過或未通過限制的記錄數量的統計資料時,請使用 expect 運算符。 違反預期記錄的記錄會連同有效的記錄一起新增至目標資料集:

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

捨棄無效的記錄

使用 expect_or_drop 運算子來防止進一步處理無效的記錄。 違反預期的記錄會從目標資料集捨棄:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

無效的記錄失敗

當無效的記錄無法接受時,請使用 expect_or_fail 運算子,在記錄驗證失敗時立即停止執行。 如果作業是資料表更新,系統會以不可部分完成的方式復原交易:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

重要

如果您在管線中定義了多個平行流程,單一流程失敗並不會造成其他流程失敗。

Delta 即時數據表流程失敗說明圖表

針對不符合預期的更新進行疑難解答

當管線因為預期違規而失敗時,您必須修正管線程式碼,以在重新執行管線之前正確處理無效的資料。

設定為使管線失敗的預期會修改您轉換的 Spark 查詢計劃,以追蹤檢測和報告違規所需的資訊。 您可以使用這項資訊來識別許多查詢導致違規的輸入記錄。 以下是預期範例:

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

多重期望管理

注意

雖然 SQL 和 Python 都支援單一數據集內的多個預期,但只有 Python 可讓您將多個個別的期望群組在一起,並指定集體動作。

具有多個期望的 Delta 實時數據表 fLow 圖表

您可以使用函式 expect_allexpect_all_or_dropexpect_all_or_fail,將多個期望群組在一起,並指定集體動作。

這些裝飾項目接受 Python 字典做為引數,其中索引鍵是預期名稱,而值是預期限制式。 您可以在管線中的多個資料集中重複使用相同的預期集。 下列顯示每個 expect_all Python 運算子的範例:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset