DLT SQL 語言參考
本文提供 DLT SQL 程式設計介面的詳細數據。
您可以在 SQL 查詢中使用 Python 使用者定義函數 (UDF),但是您必須在 Python 檔案中定義這些 UDF,才能在 SQL 來源檔案中呼叫它們。 請參閱 使用者定義的純量函式 - Python。
局限性
不支援 PIVOT
子句。 Spark 中的 pivot
作業需要急切載入輸入數據以計算輸出架構。 DLT 不支援此功能。
建立 DLT 具體化視圖或串流表
注意
建立具體化檢視的 CREATE OR REFRESH LIVE TABLE
語法已被取代。 請改用 CREATE OR REFRESH MATERIALIZED VIEW
。
宣告串流數據表或具體化檢視時,您可以使用相同的基本 SQL 語法。
使用 SQL 定義 DLT 具象化檢視
下列描述使用 SQL 在 DLT 中宣告具體化檢視的語法:
CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
CLUSTER BY clause
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
使用 SQL 宣告 DLT 串流數據表
您只能使用針對串流來源讀取的查詢來宣告串流數據表。 Databricks 建議使用自動載入器從雲端物件記憶體串流擷取檔案。 請參閱 自動載入器 SQL 語法。
將管線中的其他資料表或檢視指定為串流來源時,您必須在數據集名稱周圍包含 STREAM()
函式。
下列描述使用 SQL 在 DLT 中宣告串流資料表的語法:
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[CLUSTER BY clause]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
建立 DLT 視圖
下列描述使用 SQL 宣告檢視的語法:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
自動載入器 SQL 語法
下列描述在 SQL 中使用自動載入器的語法:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value>",
"<option-key>", "<option_value>",
...
)
)
您可以搭配自動載入器使用支援的格式選項。 使用 map()
函式,您可以將選項傳遞至 read_files()
方法。 選項是索引鍵/值組,其中索引鍵和值是字串。 如需支援格式和選項的詳細資訊,請參閱 檔案格式選項。
範例:定義數據表
您可以從外部數據來源或從管線中定義的數據集讀取來建立資料集。 若要從內部數據集讀取,請指定資料表名稱,以針對目錄和架構使用設定的管線預設值。 下列範例會定義兩個不同的數據集:一個稱為 taxi_raw
的數據表,其接受 JSON 檔案做為輸入來源,而名為 filtered_data
的數據表會接受 taxi_raw
數據表做為輸入:
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM taxi_raw
範例:從串流來源讀取
若要從串流來源讀取數據,例如自動載入器或內部數據集,請定義 STREAMING
資料表:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
如需串流資料的詳細資訊,請參閱 用管線轉換資料。
從實體化視圖或串流表永久刪除記錄
若要從已啟用刪除向量的具體化檢視或串流數據表中永久刪除記錄,例如為滿足GDPR合規性,必須在物件的底層 Delta 資料表上執行額外的操作。 若要確保從具體化檢視刪除記錄,請參閱 永久刪除具有啟用刪除向量之具體化檢視中的記錄。 若要確保從串流資料表刪除記錄,請參閱 從串流資料表永久刪除記錄。
控制數據表具體化的方式
資料表還可以對它們的實現提供更多控制。
- 指定如何使用 來
CLUSTER BY
數據表。 您可以使用液體叢集來加速查詢。 請參閱 對 Delta 表使用液態聚類。 - 請指定如何使用
PARTITIONED BY
的方式。 - 您可以使用
TBLPROPERTIES
來設定資料表屬性。 請參閱 DLT 資料表屬性。 - 使用
LOCATION
設定來設定儲存位置。 如果未設定LOCATION
,數據表數據預設會儲存在管線儲存位置。 - 您可以在架構定義中使用 產生的數據行。 請參閱 範例:指定模式和叢集欄位。
注意
對於大小小於 1 TB 的數據表,Databricks 建議讓 DLT 控制數據組織。 除非您預期您的資料表會成長超過一個太位元組,否則 Databricks 建議您不要指定分割欄位。
範例:指定架構和叢集數據行
當您定義資料表時,您可以選擇性地指定架構。 下列範例會指定目標數據表的架構,包括使用 Delta Lake 產生的數據行,並定義資料表的叢集數據行:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
根據預設,如果您未指定架構,DLT 會從 table
定義推斷架構。
範例:指定分區欄位
您可以選擇性指定資料表的分區欄位:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Liquid clustering 提供彈性且優化的叢集解決方案。 請考慮對 DLT 使用 CLUSTER BY
,而不是使用 PARTITIONED BY
。
範例:定義數據表條件約束
注意
資料表約束條件的 DLT 支援已於 進入公開預覽。 若要定義數據表條件約束,您的管線必須是已啟用 Unity 目錄的管線,並設定為使用 preview
通道。
指定架構時,您可以定義主鍵和外鍵。 條件約束是參考性的,不會強制執行。 請參閱 SQL 語言參考中的 CONSTRAINT 子句。
下列範例會定義具有主鍵和外鍵條件約束的數據表:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
使用 SQL 宣告數據表或檢視時所使用的參數化值
使用 SET
在宣告數據表或檢視表的查詢中指定組態值,包括Spark組態。 在 SET
語句之後,您在筆記本中定義的任何數據表或檢視表都可以存取這些已定義的值。 針對 SET
語句之後的任何數據表或檢視執行 Spark 查詢時,會使用使用 SET 語句指定的任何 Spark 組態。 若要讀取查詢中的組態值,請使用字串插補語法 ${}
。 下列範例會設定名為 startDate
的Spark組態值,並在查詢中使用該值:
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
若要指定多個組態值,請針對每個值使用不同的 SET
語句。
範例:定義數據列篩選和數據行遮罩
重要
行篩選和列遮罩目前在 公開預覽。
若要使用資料列篩選和列遮罩建立實體化視圖或串流表,請使用 ROW FILTER 子句 和 MASK 子句。 下列範例示範如何使用資料列篩選和資料行遮罩來定義具體化檢視和串流數據表:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze
如需資料列篩選和資料行遮罩的詳細資訊,請參閱 使用資料列篩選和資料行遮罩發行資料表。
SQL 屬性
CREATE TABLE 或 檢視 |
---|
TEMPORARY 建立數據表,但不會發佈數據表的元數據。 TEMPORARY 子句會指示 DLT 建立可供管線使用的數據表,但不應該在管線外部存取。 為了縮短處理時間,臨時表會在創建管道的整個生命周期內持續存在,而不僅僅是在單次更新時。 |
STREAMING 建立數據表,以數據流的形式讀取輸入數據集。 輸入數據集必須是串流數據源,例如自動載入器或 STREAMING 數據表。 |
CLUSTER BY 在數據表上啟用液體叢集,並定義要當做叢集索引鍵使用的數據行。 請參閱 液體聚類技術用於 Delta 表格。 |
PARTITIONED BY 用於分割數據表的一個或多個數據行的選擇性清單。 |
LOCATION 數據表數據的選擇性儲存位置。 如果未設定,系統會預設為管線儲存位置。 |
COMMENT 數據表的選擇性描述。 |
column_constraint 欄位上的選擇性資訊主鍵或外鍵約束。 |
MASK clause (公開預覽)新增數據行遮罩函式來匿名敏感數據。 該數據行的未來查詢會傳回評估函式的結果,而不是數據行的原始值。 這適用於細粒度存取控制,因為函式可以檢查使用者的身分識別和群組成員資格,以決定是否要隱藏該值。 請參閱第欄第 mask 條第款。 |
table_constraint 表上的選擇性資訊主鍵或外鍵約束條件。 |
TBLPROPERTIES 表格的表格屬性可選列表。 |
WITH ROW FILTER clause (公開預覽)將數據列篩選函式加入至數據表。 後續查詢該數據表時,會取得符合函式評估為 TRUE 的資料列子集。 這適用於精細訪問控制,因為它可讓函式檢查叫用使用者的身分識別和群組成員資格,以決定是否篩選特定數據列。 請參閱 ROW FILTER 條款。 |
select_statement 用於定義該資料表數據集的 DLT 查詢。 |
CONSTRAINT 子句 |
---|
EXPECT expectation_name 定義數據品質的約束條件 expectation_name 。 如果未定義 ON VIOLATION 條件約束,請將違反條件約束的數據列新增至目標數據集。 |
ON VIOLATION 要針對失敗的資料列採取選擇性動作:
|
在 DLT 中使用 SQL 變更數據擷取
使用 APPLY CHANGES INTO
語句來使用 DLT CDC 功能,如下列所述:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
您可以使用與非APPLY CHANGES
查詢相同的 CONSTRAINT
子句,為 APPLY CHANGES
目標定義數據品質條件約束。 請參閱 透過管線期望來管理資料品質。
注意
INSERT
和 UPDATE
事件的預設行為是從來源 插入或更新 CDC 事件:更新目標資料表中符合指定索引鍵的任何資料列,或在目標資料表中不存在比對記錄時插入新資料列。 您可以使用 DELETE
條件來指定 APPLY AS DELETE WHEN
事件的處理。
重要
您必須宣告目標串流資料表,才能將變更套用至 。 您可以選擇性地指定目標資料表的架構。 指定 APPLY CHANGES
目標資料表的架構時,您也必須包含 __START_AT
和 __END_AT
數據行,其數據類型與 sequence_by
字段相同。
請參閱 套用變更 API:使用 DLT簡化異動數據擷取。
條款 |
---|
KEYS 可唯一識別源數據中數據列的數據行或數據行組合。 這可用來識別哪些 CDC 事件會套用至目標數據表中的特定記錄。 若要定義資料行的組合,請使用以逗號分隔的數據行清單。 此條款是必需的。 |
IGNORE NULL UPDATES 允許接收含有部分目標欄位的更新。 當 CDC 事件符合現有的數據列,並指定 IGNORE NULL UPDATES 時,具有 null 的數據行會在目標中保留其現有的值。 這也適用於具有 null 值的巢狀數據行。這個子句是選擇性的。 預設值是使用 null 值覆寫現有的數據行。 |
APPLY AS DELETE WHEN 指定何時應將 CDC 事件視為 DELETE 而非 upsert。 為了處理順序錯誤的資料,已刪除的資料列會暫時保留為基礎 Delta 資料表中的墓碑,並且會在中繼資料庫中建立一個視圖,以過濾掉這些墓碑。 您可以使用某工具或方法來設定保留間隔pipelines.cdc.tombstoneGCThresholdInSeconds
資料表屬性。這個子句是選擇性的。 |
APPLY AS TRUNCATE WHEN 指定 CDC 事件何時應視為完整資料表 TRUNCATE 。 因為這個子句會觸發目標數據表的完整截斷,所以應該只用於需要這項功能的特定使用案例。只有 SCD 類型 1 才支援 APPLY AS TRUNCATE WHEN 子句。 SCD 類型 2 不支援截斷作業。這個子句是選擇性的。 |
SEQUENCE BY 指定源數據中 CDC 事件邏輯順序的數據行名稱。 DLT 會使用此排序來處理無序抵達的變更事件。 指定的數據行必須是可排序的數據類型。 此條款是必需的。 |
COLUMNS 指定要包含在目標數據表中的數據行子集。 您可以選擇:
這個子句是選擇性的。 預設情況下,如果未指定 COLUMNS 子句,將在目標表中包含所有列。 |
STORED AS 是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。 這個子句是選擇性的。 預設值為 SCD 類型 1。 |
TRACK HISTORY ON 指定輸出數據行的子集,以在那些指定的數據行有任何變更時產生記錄記錄。 您可以選擇:
這個子句是選擇性的。 當有任何變更時,預設會追蹤所有輸出資料列的歷程記錄,相當於 TRACK HISTORY ON * 。 |