使用管線轉換數據
本文說明如何使用 DLT 來宣告資料集上的轉換,以及指定如何透過查詢邏輯處理記錄。 它也包含建置 DLT 管線的常見轉換模式範例。
您可以針對傳回 DataFrame 的任何查詢定義數據集。 您可以使用 Apache Spark 內建作業、UDF、自定義邏輯和 MLflow 模型作為 DLT 管線中的轉換。 將數據內嵌至 DLT 管線之後,您可以針對上游來源定義新的數據集,以建立新的串流數據表、具體化檢視和檢視。
若要瞭解如何使用 DLT 有效地執行具狀態處理,請參閱 使用浮水印優化 DLT 中的具狀態處理。
使用檢視、具體化檢視和流式資料表的時機
實作管線查詢時,請選擇最佳的數據集類型,以確保其有效率且可維護。
請考慮使用視圖來進行下列動作:
- 將您需要的大型或複雜查詢拆分成較容易管理的查詢。
- 使用預期驗證中繼結果。
- 減少您不需要保存之結果的儲存和運算成本。 由於數據表已具體化,因此需要額外的計算和記憶體資源。
請考慮在下列情況下使用具體化檢視:
- 多個下游查詢會取用數據表。 因為檢視會視需要計算,因此每次查詢檢視時都會重新計算檢視。
- 其他管線、作業或查詢會取用數據表。 由於檢視並未具體化,因此您只能在相同的管線中使用它們。
- 您想要在開發期間檢視查詢的結果。 由於數據表已具體化,而且可以在管線外部檢視和查詢,因此在開發期間使用數據表有助於驗證計算的正確性。 驗證之後,將不需要具體化的查詢轉換成檢視。
請考慮在下列情況下使用串流資料表:
- 查詢是針對持續或累加成長的數據源所定義。
- 查詢結果應該以累加方式計算。
- 管線需要高輸送量和低延遲。
注意
串流數據表一律會針對串流來源定義。 您也可以搭配 APPLY CHANGES INTO
使用串流來源,以套用 CDC 資料來源的更新。 請參閱 套用變更 API:使用 DLT簡化異動數據擷取。
從目標架構中排除數據表
如果您必須計算不打算提供給外部使用的中繼數據表,則可以使用 TEMPORARY
關鍵詞防止它們發佈至架構。 臨時表仍會根據 DLT 語意來儲存和處理數據,但不應在目前的管線外部存取。 臨時表會在建立它的處理程序的整個存續期間內保留。 使用下列語法來宣告臨時表:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
蟒
@dlt.table(
temporary=True)
def temp_table():
return ("...")
在單一管線中結合串流數據表和具體化檢視
串流數據表會繼承 Apache Spark 結構化串流的處理保證,並設定為處理僅附加數據源的查詢,其中新數據列一律會插入源數據表中,而不是修改。
注意
雖然根據預設,串流表需要只能追加的數據來源,但當串流來源是另一個需要更新或刪除的串流數據表時,您可以使用 skipChangeCommits 旗標來覆寫此行為。
常見的串流模式牽涉到內嵌源數據,以在管線中建立初始數據集。 這些初始數據集通常稱為銅牌數據表,而且通常會執行簡單的轉換。
相較之下,管線中的最終表格常被稱為黃金表格,通常需要複雜的匯總,或需要從 APPLY CHANGES INTO
作業的目標中讀取資料。 由於這些作業本質上會產生更新操作,而不是附加操作,因此不支援將它們用作串流資料表的輸入。 這些轉換更適合具體化檢視。
藉由將串流數據表和具體化檢視混合成單一管線,您可以簡化管線、避免成本高昂的重新擷取或重新處理原始數據,並具備 SQL 的完整功能,透過有效率編碼和篩選的數據集計算複雜的匯總。 下列範例說明這種類型的混合處理:
注意
這些範例會使用自動載入器從雲端記憶體載入檔案。 若要在啟用了 Unity Catalog 的管線中使用 Auto Loader 載入檔案,您必須使用 外部位置。 若要深入瞭解如何搭配 DLT 使用 Unity 目錄,請參閱 搭配您的 DLT 管線使用 Unity 目錄。
蟒
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
"abfss://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
深入瞭解如何使用 自動載入器 從 Azure 記憶體累加內嵌 JSON 檔案。
Stream-static 聯結
當以主要靜態維度數據表反正規化僅限附加數據的連續數據流時,串流靜態聯結是不錯的選擇。
隨著每次管線更新,來自數據流的新記錄會與靜態表的最新快照聯結。 如果在處理串流數據表的對應數據之後,在靜態數據表中新增或更新記錄,除非執行完整重新整理,否則不會重新計算結果記錄。
在設定為觸發執行的管線中,靜態數據表會在更新啟動時傳回結果。 在針對持續執行的管線中,每次數據表處理更新時,都會查詢最新版的靜態數據表。
以下是數據流靜態聯結的範例:
蟒
@dlt.table
def customer_sales():
return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
有效率地計算匯總
您可以使用串流數據表,以累加方式計算簡單的分散式匯總,例如 count、min、max 或 sum,以及平均或標準偏差等代數匯總。 Databricks 建議對具有有限群組的查詢進行累加匯總,例如具有 GROUP BY country
子句的查詢。 每個更新只會讀取新的輸入數據。
若要深入瞭解撰寫執行累加匯總的 DLT 查詢,請參閱使用浮水印執行視窗式匯總 。
在 DLT 管線中使用 MLflow 模型
注意
若要在已啟用 Unity 目錄的管線中使用 MLflow 模型,您的管線必須設定為使用 preview
通道。 若要使用 current
信道,您必須設定管線以發佈至 Hive 中繼存放區。
您可以在 DLT 管線中使用 MLflow 定型模型。 MLflow 模型會被視為 Azure Databricks 中的轉換,這表示它們會處理 Spark DataFrame 輸入,並將結果傳回為 Spark DataFrame。 因為 DLT 會針對 DataFrame 定義數據集,因此您可以使用幾行程式代碼,將使用 MLflow 的 Apache Spark 工作負載轉換成 DLT。 如需有關 MLflow 的更多信息,請參閱 MLflow 在生成 AI 代理和 ML 模型生命週期中的應用。
如果您已經有呼叫 MLflow 模型的 Python 筆記本,您可以使用 @dlt.table
裝飾項目,確保已定義函式以傳回轉換結果,將此程式代碼調整為 DLT。 DLT 預設不會安裝 MLflow,因此請確認您已使用 %pip install mlflow
安裝 MLFlow 連結庫,並在筆記本頂端匯入 mlflow
和 dlt
。 如需 DLT 語法的簡介,請參閱 使用 Python 開發管線代碼。
若要在 DLT 中使用 MLflow 模型,請完成下列步驟:
- 取得 MLflow 模型的執行識別碼和模型名稱。 執行標識碼和模型名稱可用來建構 MLflow 模型的 URI。
- 使用 URI 來定義 Spark UDF 以載入 MLflow 模型。
- 在您的數據表定義中呼叫UDF,以使用MLflow模型。
下列範例顯示此模式的基本語法:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
作為完整範例,下列程式代碼會定義名為 loaded_model_udf
的Spark UDF,以載入以貸款風險數據定型的MLflow模型。 用來進行預測的數據列作為參數傳遞至 UDF。 數據表 loan_risk_predictions
會計算 loan_risk_input_data
中每個數據列的預測。
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
保留手動刪除或更新
DLT 可讓您手動刪除或更新數據表中的記錄,並執行重新整理作業來重新計算下游數據表。
根據預設,DLT 會在每次更新管線時根據輸入數據重新計算數據表結果,因此您必須確定已刪除的記錄不會從源數據重載。 將 pipelines.reset.allowed
數據表屬性設定為 false
可防止重新整理數據表,但不會防止累加寫入數據表或新的數據流入數據表。
下圖說明使用兩個串流數據表的範例:
-
raw_user_table
從來源匯入原始用戶資料。 -
bmi_table
會使用來自raw_user_table
的體重和身高,逐步計算 BMI 指數。
您要從 raw_user_table
手動刪除或更新使用者記錄,然後重新計算 bmi_table
。
下列程式代碼示範如何將 pipelines.reset.allowed
數據表屬性設定為 false
,以停用 raw_user_table
的完整重新整理,以便在一段時間內保留預期的變更,但在執行管線更新時會重新計算下游數據表:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);