共用方式為


DLT SQL 語言參考

本文提供 DLT SQL 程式設計介面的詳細數據。

  • 如需 Python API 的詳細資訊,請參閱 DLT Python 語言參考。
  • 如需 SQL 命令的詳細資訊,請參閱 SQL 語言參考。

您可以在 SQL 查詢中使用 Python 使用者定義函數 (UDF),但是您必須在 Python 檔案中定義這些 UDF,才能在 SQL 來源檔案中呼叫它們。 請參閱 使用者定義的純量函式 - Python

局限性

不支援 PIVOT 子句。 Spark 中的 pivot 作業需要急切載入輸入數據以計算輸出架構。 DLT 不支援此功能。

建立 DLT 具體化視圖或串流表

注意

建立具體化檢視的 CREATE OR REFRESH LIVE TABLE 語法已被取代。 請改用 CREATE OR REFRESH MATERIALIZED VIEW

宣告串流數據表或具體化檢視時,您可以使用相同的基本 SQL 語法。

使用 SQL 定義 DLT 具象化檢視

下列描述使用 SQL 在 DLT 中宣告具體化檢視的語法:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  CLUSTER BY clause
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

使用 SQL 宣告 DLT 串流數據表

您只能使用針對串流來源讀取的查詢來宣告串流數據表。 Databricks 建議使用自動載入器從雲端物件記憶體串流擷取檔案。 請參閱 自動載入器 SQL 語法

將管線中的其他資料表或檢視指定為串流來源時,您必須在數據集名稱周圍包含 STREAM() 函式。

下列描述使用 SQL 在 DLT 中宣告串流資料表的語法:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [CLUSTER BY clause]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

建立 DLT 視圖

下列描述使用 SQL 宣告檢視的語法:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

自動載入器 SQL 語法

下列描述在 SQL 中使用自動載入器的語法:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value>",
      "<option-key>", "<option_value>",
      ...
    )
  )

您可以搭配自動載入器使用支援的格式選項。 使用 map() 函式,您可以將選項傳遞至 read_files() 方法。 選項是索引鍵/值組,其中索引鍵和值是字串。 如需支援格式和選項的詳細資訊,請參閱 檔案格式選項。

範例:定義數據表

您可以從外部數據來源或從管線中定義的數據集讀取來建立資料集。 若要從內部數據集讀取,請指定資料表名稱,以針對目錄和架構使用設定的管線預設值。 下列範例會定義兩個不同的數據集:一個稱為 taxi_raw 的數據表,其接受 JSON 檔案做為輸入來源,而名為 filtered_data 的數據表會接受 taxi_raw 數據表做為輸入:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM taxi_raw

範例:從串流來源讀取

若要從串流來源讀取數據,例如自動載入器或內部數據集,請定義 STREAMING 資料表:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

如需串流資料的詳細資訊,請參閱 用管線轉換資料

從實體化視圖或串流表永久刪除記錄

若要從已啟用刪除向量的具體化檢視或串流數據表中永久刪除記錄,例如為滿足GDPR合規性,必須在物件的底層 Delta 資料表上執行額外的操作。 若要確保從具體化檢視刪除記錄,請參閱 永久刪除具有啟用刪除向量之具體化檢視中的記錄。 若要確保從串流資料表刪除記錄,請參閱 從串流資料表永久刪除記錄

控制數據表具體化的方式

資料表還可以對它們的實現提供更多控制。

注意

對於大小小於 1 TB 的數據表,Databricks 建議讓 DLT 控制數據組織。 除非您預期您的資料表會成長超過一個太位元組,否則 Databricks 建議您不要指定分割欄位。

範例:指定架構和叢集數據行

當您定義資料表時,您可以選擇性地指定架構。 下列範例會指定目標數據表的架構,包括使用 Delta Lake 產生的數據行,並定義資料表的叢集數據行:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) CLUSTER BY (order_day_of_week, customer_id)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

根據預設,如果您未指定架構,DLT 會從 table 定義推斷架構。

範例:指定分區欄位

您可以選擇性指定資料表的分區欄位:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Liquid clustering 提供彈性且優化的叢集解決方案。 請考慮對 DLT 使用 CLUSTER BY,而不是使用 PARTITIONED BY

範例:定義數據表條件約束

注意

資料表約束條件的 DLT 支援已於 進入公開預覽。 若要定義數據表條件約束,您的管線必須是已啟用 Unity 目錄的管線,並設定為使用 preview 通道。

指定架構時,您可以定義主鍵和外鍵。 條件約束是參考性的,不會強制執行。 請參閱 SQL 語言參考中的 CONSTRAINT 子句

下列範例會定義具有主鍵和外鍵條件約束的數據表:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

使用 SQL 宣告數據表或檢視時所使用的參數化值

使用 SET 在宣告數據表或檢視表的查詢中指定組態值,包括Spark組態。 在 SET 語句之後,您在筆記本中定義的任何數據表或檢視表都可以存取這些已定義的值。 針對 SET 語句之後的任何數據表或檢視執行 Spark 查詢時,會使用使用 SET 語句指定的任何 Spark 組態。 若要讀取查詢中的組態值,請使用字串插補語法 ${}。 下列範例會設定名為 startDate 的Spark組態值,並在查詢中使用該值:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

若要指定多個組態值,請針對每個值使用不同的 SET 語句。

範例:定義數據列篩選和數據行遮罩

重要

行篩選和列遮罩目前在 公開預覽

若要使用資料列篩選和列遮罩建立實體化視圖或串流表,請使用 ROW FILTER 子句MASK 子句。 下列範例示範如何使用資料列篩選和資料行遮罩來定義具體化檢視和串流數據表:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze

如需資料列篩選和資料行遮罩的詳細資訊,請參閱 使用資料列篩選和資料行遮罩發行資料表

SQL 屬性

CREATE TABLE 或 檢視
TEMPORARY
建立數據表,但不會發佈數據表的元數據。 TEMPORARY 子句會指示 DLT 建立可供管線使用的數據表,但不應該在管線外部存取。 為了縮短處理時間,臨時表會在創建管道的整個生命周期內持續存在,而不僅僅是在單次更新時。
STREAMING
建立數據表,以數據流的形式讀取輸入數據集。 輸入數據集必須是串流數據源,例如自動載入器或 STREAMING 數據表。
CLUSTER BY
在數據表上啟用液體叢集,並定義要當做叢集索引鍵使用的數據行。
請參閱 液體聚類技術用於 Delta 表格
PARTITIONED BY
用於分割數據表的一個或多個數據行的選擇性清單。
LOCATION
數據表數據的選擇性儲存位置。 如果未設定,系統會預設為管線儲存位置。
COMMENT
數據表的選擇性描述。
column_constraint
欄位上的選擇性資訊主鍵或外鍵約束。
MASK clause (公開預覽)
新增數據行遮罩函式來匿名敏感數據。 該數據行的未來查詢會傳回評估函式的結果,而不是數據行的原始值。 這適用於細粒度存取控制,因為函式可以檢查使用者的身分識別和群組成員資格,以決定是否要隱藏該值。
請參閱第欄第mask條第款。
table_constraint
表上的選擇性資訊主鍵或外鍵約束條件。
TBLPROPERTIES
表格的表格屬性可選列表。
WITH ROW FILTER clause (公開預覽)
將數據列篩選函式加入至數據表。 後續查詢該數據表時,會取得符合函式評估為 TRUE 的資料列子集。 這適用於精細訪問控制,因為它可讓函式檢查叫用使用者的身分識別和群組成員資格,以決定是否篩選特定數據列。
請參閱 ROW FILTER 條款
select_statement
用於定義該資料表數據集的 DLT 查詢。
CONSTRAINT 子句
EXPECT expectation_name
定義數據品質的約束條件 expectation_name。 如果未定義 ON VIOLATION 條件約束,請將違反條件約束的數據列新增至目標數據集。
ON VIOLATION
要針對失敗的資料列採取選擇性動作:
  • FAIL UPDATE:立即停止管道執行。
  • DROP ROW:丟棄記錄並繼續處理。

在 DLT 中使用 SQL 變更數據擷取

使用 APPLY CHANGES INTO 語句來使用 DLT CDC 功能,如下列所述:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

您可以使用與非APPLY CHANGES 查詢相同的 CONSTRAINT 子句,為 APPLY CHANGES 目標定義數據品質條件約束。 請參閱 透過管線期望來管理資料品質

注意

INSERTUPDATE 事件的預設行為是從來源 插入或更新 CDC 事件:更新目標資料表中符合指定索引鍵的任何資料列,或在目標資料表中不存在比對記錄時插入新資料列。 您可以使用 DELETE 條件來指定 APPLY AS DELETE WHEN 事件的處理。

重要

您必須宣告目標串流資料表,才能將變更套用至 。 您可以選擇性地指定目標資料表的架構。 指定 APPLY CHANGES 目標資料表的架構時,您也必須包含 __START_AT__END_AT 數據行,其數據類型與 sequence_by 字段相同。

請參閱 套用變更 API:使用 DLT簡化異動數據擷取。

條款
KEYS
可唯一識別源數據中數據列的數據行或數據行組合。 這可用來識別哪些 CDC 事件會套用至目標數據表中的特定記錄。
若要定義資料行的組合,請使用以逗號分隔的數據行清單。
此條款是必需的。
IGNORE NULL UPDATES
允許接收含有部分目標欄位的更新。 當 CDC 事件符合現有的數據列,並指定 IGNORE NULL UPDATES 時,具有 null 的數據行會在目標中保留其現有的值。 這也適用於具有 null值的巢狀數據行。
這個子句是選擇性的。
預設值是使用 null 值覆寫現有的數據行。
APPLY AS DELETE WHEN
指定何時應將 CDC 事件視為 DELETE 而非 upsert。 為了處理順序錯誤的資料,已刪除的資料列會暫時保留為基礎 Delta 資料表中的墓碑,並且會在中繼資料庫中建立一個視圖,以過濾掉這些墓碑。 您可以使用某工具或方法來設定保留間隔
pipelines.cdc.tombstoneGCThresholdInSeconds 資料表屬性
這個子句是選擇性的。
APPLY AS TRUNCATE WHEN
指定 CDC 事件何時應視為完整資料表 TRUNCATE。 因為這個子句會觸發目標數據表的完整截斷,所以應該只用於需要這項功能的特定使用案例。
只有 SCD 類型 1 才支援 APPLY AS TRUNCATE WHEN 子句。 SCD 類型 2 不支援截斷作業。
這個子句是選擇性的。
SEQUENCE BY
指定源數據中 CDC 事件邏輯順序的數據行名稱。 DLT 會使用此排序來處理無序抵達的變更事件。
指定的數據行必須是可排序的數據類型。
此條款是必需的。
COLUMNS
指定要包含在目標數據表中的數據行子集。 您可以選擇:
  • 指定要包含之欄位的完整清單:COLUMNS (userId, name, city)
  • 指定要排除的欄位清單:COLUMNS * EXCEPT (operation, sequenceNum)

這個子句是選擇性的。
預設情況下,如果未指定 COLUMNS 子句,將在目標表中包含所有列。
STORED AS
是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。
這個子句是選擇性的。
預設值為 SCD 類型 1。
TRACK HISTORY ON
指定輸出數據行的子集,以在那些指定的數據行有任何變更時產生記錄記錄。 您可以選擇:
  • 指定要追蹤的欄位的完整清單:COLUMNS (userId, name, city)
  • 指定要從追蹤中排除的數據行清單:COLUMNS * EXCEPT (operation, sequenceNum)

這個子句是選擇性的。 當有任何變更時,預設會追蹤所有輸出資料列的歷程記錄,相當於 TRACK HISTORY ON *