使用數據處理管線的期望來管理資料品質
使用預期來套用品質條件約束,以在數據流經 ETL 管線時驗證數據。 期望可讓您更深入地了解數據品質指標,進而在偵測到無效記錄時,允許您選擇不更新或刪除記錄。
本文提供預期的概觀,包括語法範例和行為選項。 如需更進階的使用案例和建議的最佳做法,請參閱 預期建議和進階模式。
什麼是期望?
預期是管線具體化檢視、串流數據表或檢視建立語句中的選擇性子句,這些語句會在傳遞查詢的每個記錄上套用數據質量檢查。 預期會使用標準 SQL 布爾語句來指定條件約束。 您可以為單一數據集結合多個期望,並於管線中為所有數據集宣告設定期望。
下列各節介紹期望的三個元件,並提供語法範例。
預期名稱
每個期望都必須有一個名稱,此名稱會作為標識符來追蹤及監視預期。 選擇用來傳達所驗證計量的名稱。 下列範例會定義預期 valid_customer_age
,以確認年齡在 0 到 120 歲之間:
重要
指定數據集的預期名稱必須是唯一的。 您可以在管線中的多個資料集之間重複使用預期。 請參閱 可攜式且可重複使用的期望。
Python
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
評估的限制條件
每個記錄的約束子句是 SQL 條件語句,必須被評估為真或假。 條件約束包含正在驗證之專案的實際邏輯。 當記錄失敗此條件時,就會觸發預期。
約束必須使用有效的 SQL 語法,且不能包含下列內容:
- 自定義 Python 函式
- 外部服務呼叫
- 參考其他數據表的子查詢
以下是可新增至資料集創建敘述的限制條件範例:
Python
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
無效記錄的處理
您必須指定動作,以判斷記錄失敗驗證檢查時會發生什麼情況。 下表描述可用的動作:
動作 | SQL 語法 | Python 語法 | 結果 |
---|---|---|---|
warn (預設值) | EXPECT |
dlt.expect |
無效的記錄會寫入目標。 有效和無效的記錄計數會與其他數據集計量一起記錄。 |
卸除 | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
在將數據寫入目標之前,會移除無效的記錄。 已移除的記錄計數會與其他數據集指標一起登錄。 |
fail | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
無效的記錄可防止更新成功。 重新處理之前需要手動介入。 此預期會導致單一流程失敗,而且不會造成管線中的其他流程失敗。 |
您也可以實作高級邏輯來隔離無效的記錄,而不會失敗或丟棄數據。 請參閱 隔離無效的記錄。
期望追蹤指標
您可以從管道介面查看 warn
或 drop
動作的追蹤指標。 由於 fail
導致偵測到無效記錄時更新失敗,因此不會記錄計量。
若要檢視預期計量,請完成下列步驟:
- 點擊側邊欄中的 Delta Live Tables。
- 點擊 管線 名稱。
- 點選具預期條件的資料集。
- 選取右側提要字段中的 [數據品質] 索引標籤。
您可以藉由查詢 Delta Live Tables 事件記錄檔來檢視數據質量計量。 請參閱從事件記錄檔
保留無效的記錄
保留無效的記錄是預期的預設行為。 在您想要保留違反預期的記錄,並且希望收集通過或未通過限制的記錄數量的統計資料時,請使用 expect
運算符。 違反預期記錄的記錄會連同有效的記錄一起新增至目標資料集:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
捨棄無效的記錄
使用 expect_or_drop
運算子來防止進一步處理無效的記錄。 違反預期的記錄會從目標資料集捨棄:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
無效的記錄失敗
當無效的記錄無法接受時,請使用 expect_or_fail
運算子,在記錄驗證失敗時立即停止執行。 如果作業是資料表更新,系統會以不可部分完成的方式復原交易:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
重要
如果您在管線中定義了多個平行流程,單一流程失敗並不會造成其他流程失敗。
針對不符合預期的更新進行疑難解答
當管線因為預期違規而失敗時,您必須修正管線程式碼,以在重新執行管線之前正確處理無效的資料。
設定為使管線失敗的預期會修改您轉換的 Spark 查詢計劃,以追蹤檢測和報告違規所需的資訊。 您可以使用這項資訊來識別許多查詢導致違規的輸入記錄。 以下是預期範例:
Expectation Violated:
{
"flowName": "sensor-pipeline",
"verboseInfo": {
"expectationsViolated": [
"temperature_in_valid_range"
],
"inputData": {
"id": "TEMP_001",
"temperature": -500,
"timestamp_ms": "1710498600"
},
"outputRecord": {
"sensor_id": "TEMP_001",
"temperature": -500,
"change_time": "2024-03-15 10:30:00"
},
"missingInputData": false
}
}
多重期望管理
注意
雖然 SQL 和 Python 都支援單一數據集內的多個預期,但只有 Python 可讓您將多個個別的期望群組在一起,並指定集體動作。
您可以使用函式 expect_all
、expect_all_or_drop
和 expect_all_or_fail
,將多個期望群組在一起,並指定集體動作。
這些裝飾項目接受 Python 字典做為引數,其中索引鍵是預期名稱,而值是預期限制式。 您可以在管線中的多個資料集中重複使用相同的預期集。 下列顯示每個 expect_all
Python 運算子的範例:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset