共用方式為


使用管線設定來管理數據品質

使用預期來套用品質條件約束,以在數據流經 ETL 管線時驗證數據。 預期值提供對數據品質指標的更深入見解,並允許您在偵測到無效記錄時更新失敗或刪除記錄。

本文提供預期的概觀,包括語法範例和行為選項。 如需更進階的使用案例和建議的最佳做法,請參閱 預期建議和進階模式

DLT 預期流程圖

什麼是期望?

在管線具體化檢視、串流資料表或檢視建立語句中,期望是選擇性子句,這些語句會在傳遞查詢的每個記錄上套用資料品質檢查。 預期會使用標準 SQL 布爾語句來指定條件約束。 您可以合併單個數據集的多個預期,並在管線中所有數據集宣告中設定預期。

下列各節介紹期望的三個元件,並提供語法範例。

預期名稱

每個期望都必須有一個名稱,此名稱會作為標識符來追蹤及監視預期。 選擇用來傳達所驗證計量的名稱。 下列範例會定義預期 valid_customer_age,以確認年齡在 0 到 120 歲之間:

重要

指定數據集的預期名稱必須是唯一的。 您可以在管線中的多個資料集之間重複使用預期。 請參閱 可攜式且可重複使用的期望

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

需要評估的約束條件

條件約束子句是一個 SQL 條件語句,每個記錄都必須評估為 true 或 false。 條件約束包含正在驗證之專案的實際邏輯。 當記錄失敗此條件時,就會觸發預期。

限制條件必須使用有效的 SQL 語法,且不能包含下列內容:

  • 自定義 Python 函式
  • 外部服務呼叫
  • 參考其他數據表的子查詢

以下是可新增至數據集建立語句的條件約束範例:

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

針對無效記錄的處理措施

您必須指定動作,以判斷記錄失敗驗證檢查時會發生什麼情況。 下表描述可用的動作:

行動 SQL 語法 Python 語法 結果
警告 (預設值) EXPECT dlt.expect 無效的記錄會寫入目標。 有效和無效的記錄計數會與其他數據集計量一起記錄。
卸除 EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop 將數據寫入目標之前,會移除無效的記錄。 已丟失的記錄數量會與其他數據集的指標一起記錄。
失敗 EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail 無效的記錄可防止更新成功。 重新處理之前需要手動介入。 此預期會導致單一流程失敗,而且不會造成管線中的其他流程失敗。

您也可以執行進階邏輯來隔離保護無效的記錄,而不會導致失敗或刪除資料。 請參閱 隔離無效記錄

期望追蹤指標

您可以從管線 UI 查看 warndrop 動作的追蹤計量。 由於 fail 導致偵測到無效記錄時更新失敗,因此不會記錄計量。

若要檢視預期計量,請完成下列步驟:

  1. 在側邊欄中點擊 DLT
  2. 點擊您的管線 名稱
  3. 單擊一個已定義預期的資料集。
  4. 選取右側側邊欄中的 [數據品質] 索引標籤。

您可以藉由查詢 DLT 事件記錄檔來檢視數據品質計量。 請參閱 從事件記錄檔查詢數據品質

保留無效的記錄

保留無效的記錄是預期的預設行為。 當您想要保留違反預期的記錄,但收集通過或不符合限制的記錄數量的計量時,請使用 expect 運算符。 違反預期記錄的記錄會連同有效的記錄一起新增至目標數據集:

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

卸除無效的記錄

使用 expect_or_drop 運算符來防止進一步處理無效的記錄。 違反預期的記錄會從目標數據集卸除:

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

在無效記錄上失敗

當無效的記錄無法接受時,請使用 expect_or_fail 運算符,在記錄驗證失敗時立即停止執行。 如果作業是資料表更新,系統將原子性地回復交易。

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

重要

如果您在管線中定義了多個平行流程,單一流程失敗並不會造成其他流程失敗。

DLT 流程失敗說明圖表

針對預期失敗的更新進行疑難解答

當管線因為預期違反而失敗時,您必須修正管線的程式碼,以在重新執行管線之前正確處理無效的資料。

設為使管線失敗的預期設定會修改您的轉換過程中的 Spark 查詢計劃,以追蹤偵測和報告違規所需的資訊。 您可以使用這項資訊來識別許多查詢導致違規的輸入記錄。 以下是預期範例:

Expectation Violated:
{
  "flowName": "sensor-pipeline",
  "verboseInfo": {
    "expectationsViolated": [
      "temperature_in_valid_range"
    ],
    "inputData": {
      "id": "TEMP_001",
      "temperature": -500,
      "timestamp_ms": "1710498600"
    },
    "outputRecord": {
      "sensor_id": "TEMP_001",
      "temperature": -500,
      "change_time": "2024-03-15 10:30:00"
    },
    "missingInputData": false
  }
}

多元期望管理

注意

雖然 SQL 和 Python 都支援單一數據集內的多個預期,但只有 Python 可讓您將多個個別的期望群組在一起,並指定集體動作。

具有多個期望的 DLT flow 圖表

您可以使用函式 expect_allexpect_all_or_dropexpect_all_or_fail,將多個期望群組在一起,並指定集體動作。

這些裝飾專案接受 Python 字典做為自變數,其中索引鍵是預期名稱,而值是預期條件約束。 您可以在管線中的多個資料集中重複使用相同的預期集。 下列顯示每個 expect_all Python 運算子的範例:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset