共用方式為


Delta Live Tables SQL 語言參考

本文提供 Delta Live Tables SQL 程式設計介面的詳細資料。

您可以在 SQL 查詢中使用 Python 使用者定義的函式 (UDF),但是必須在 Python 檔案中定義這些 UDF,才能在 SQL 來源檔案中呼叫它們。 請參閱使用者定義的純量函式 - Python

限制

不支援 PIVOT 子句。 Spark 中的 pivot 操作需要積極式載入輸入資料,才能計算輸出結構描述。 差異即時資料表不支援此功能。

建立差異即時資料表具體化檢視或串流資料表

注意

建立具體化檢視的 CREATE OR REFRESH LIVE TABLE 語法已被取代。 請改用 CREATE OR REFRESH MATERIALIZED VIEW

宣告串流資料表或具體化檢視時,可以使用相同的基本 SQL 語法。

使用 SQL 宣告差異實時數據表具體化檢視

下列描述使用 SQL 在 Delta Live Tables 中宣告具體化檢視的語法:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

使用 SQL 宣告差異實時數據表串流數據表

您只能使用針對串流來源進行讀取的查詢來宣告串流資料表。 Databricks 建議使用自動載入器從雲端物件儲存體中串流檔案的擷取。 請參閱自動載入器 SQL 語法

將管線中的其他資料表或檢視指定為串流來源時,必須在資料集名稱周圍包含 STREAM() 函式。

下列描述使用 SQL 在 Delta Live Tables 中宣告串流數據表的語法:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

建立差異即時資料表檢視

以下描述使用 SQL 來宣告檢視的語法:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

自動載入器 SQL 語法

以下描述在 SQL 中使用自動載入器的語法:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

您可以搭配使用支援的格式選項與自動載入器。 您可以使用 函式 map() ,將選項傳遞至 read_files() 方法。 選項是索引鍵/值組,其中索引鍵和值是字串。 如需支援格式和選項的詳細資訊,請參閱檔案格式選項

範例:定義資料表

可以從外部資料來源或在管線中定義的資料集中進行讀取來建立資料集。 若要從內部數據集讀取,請指定資料表名稱,以針對目錄和架構使用設定的管線預設值。 下列範例會定義兩個不同的資料集:一個稱為 taxi_raw 的資料表,其接受 JSON 檔案做為輸入來源,而名為 filtered_data 的資料表會接受 taxi_raw 資料表做為輸入:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM taxi_raw

範例:從串流來源讀取

若要從串流來源讀取資料,例如自動載入器或內部數據集,請定義 STREAMING 資料表:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

如需串流資料的詳細資訊,請參閱使用 Delta Live Tables 轉換資料

控制資料表具體化的方式

資料表也提供其具體化的額外控制:

注意

對於大小小於 1 TB 的資料表,Databricks 建議讓差異即時資料表控制資料組織。 除非您預期資料表的增長超過 TB,否則 Databricks 建議您不要指定分割資料行。

範例:指定結構描述和資料分割資料行

當您定義資料表時,可以選擇性地指定結構描述。 下列範例會指定目標資料表的結構描述,包括使用 Delta Lake 生成的資料行以及定義資料表的分割資料行:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

根據預設,如果您未指定結構描述,差異即時資料表會從 table 定義推斷結構描述。

範例:定義資料表條件約束

注意

資料表條件約束的 Delta Live Tables 支援處於公開預覽狀態。 若要定義資料表條件約束,您的管線必須是已啟用 Unity Catalog 的管線,並設定為使用 preview 通道。

指定結構描述時,可以定義主索引鍵和外部索引鍵。 條件約束僅供參考,不會強制執行。 請參閱 SQL 語言參考中 CONSTRAINT 的第 條款。

下列範例會定義具有主索引鍵和外部索引鍵條件約束的資料表:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

參數化使用 SQL 宣告資料表或檢視時所使用的值

使用 SET 在宣告資料表或檢視的查詢中指定組態值,包括 Spark 組態。 在 SET 陳述式存取已定義的值之後,在筆記本中定義的任何資料表或檢視。 針對 SET 語句之後的任何數據表或檢視執行 Spark 查詢時,會使用使用 SET 語句指定的任何 Spark 組態。 若要讀取查詢中的組態值,請使用字串插補語法 ${}。 下列範例會設定名為 startDate 的 Spark 組態值,並在查詢中使用該值:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

若要指定多個組態值,請針對每個值使用不同的 SET 陳述式。

範例:定義資料列篩選和資料行遮罩

重要

資料列篩選和資料行遮罩處於公開預覽狀態。

若要使用資料列篩選和數據行遮罩建立具體化檢視表或串流數據表,請使用 ROW FILTER 子句MASK 子句。 下列範例示範如何使用資料列篩選和資料行遮罩來定義具體化檢視和串流數據表:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM sales_bronze

如需有關資料列篩選和資料行遮罩的詳細資訊,請參閱發佈具有資料列篩選和資料行遮罩的資料表

SQL 屬性

CREATE TABLE 或 VIEW
TEMPORARY

建立資料表,但不會發佈資料表的中繼資料。 TEMPORARY 子句會指示 Delta Live Tables 建立可供管線使用的資料表,但不應該在管線外部存取。 為了縮短處理時間,臨時資料表會保存管線的存留期,而不只是單一更新。
STREAMING

建立一個將輸入資料集作為串流進行讀取的資料表。 輸入資料集必須是串流資料來源,例如自動載入器或 STREAMING 資料表。
CLUSTER BY

在資料表上啟用液體群集,並定義要當做叢集索引鍵使用的資料行。

請參閱<針對差異資料表使用液態叢集>。
PARTITIONED BY

用於分割資料表的一個或多個資料行的選用清單。
LOCATION

資料表資料的選用儲存位置。 如果未設定,系統將預設為管線儲存位置。
COMMENT

資料表的選用描述。
column_constraint

資料行上的可選資訊主索引鍵或外部索引鍵條件約束。
MASK clause (公開預覽)

新增資料行 mask 函式來匿名敏感性資料。 該資料行的未來查詢會傳回已評估函式的結果,而非資料行的原始值。 這適用於更細緻的存取控制,因為函式會檢查使用者的身分識別和群組成員資格,以決定是否要修訂該值。

請參閱資料行遮罩子句
table_constraint

資料表上的可選資訊主索引鍵或外部索引鍵條件約束。
TBLPROPERTIES

資料表之資料表屬性的選用清單。
WITH ROW FILTER clause (公開預覽)

將資料列篩選函數新增至資料表。 該資料表的未來查詢會接收函式評估為 TRUE 的資料列子集。 這適用於更細緻的存取控制,因為它允許函式檢查叫用使用者的身分識別和/或群組成員資格,以決定是否篩選某些資料列。

請參閱 ROW FILTER 子句
select_statement

定義資料表資料集的 Delta Live Tables 查詢。
CONSTRAINT 子句
EXPECT expectation_name

定義資料品質條件約束 expectation_name。 如果未定義 ON VIOLATION 條件約束,請將違反條件約束的資料列新增至目標資料集。
ON VIOLATION

針對失敗的資料列採取的可選動作:

- FAIL UPDATE:立即停止管線執行。
- DROP ROW:卸除記錄並繼續處理。

使用 Delta Live Tables 中的 SQL 異動資料擷取

使用 APPLY CHANGES INTO 陳述式來使用 Delta Live Tables CDC 功能,如下所述:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

可以使用與非 APPLY CHANGES 查詢相同的 CONSTRAINT 子句來定義 APPLY CHANGES 目標的資料品質條件約束。 請參閱 使用資料流程期望來管理資料品質

注意

INSERTUPDATE 事件的預設行為是從來源中 upsert CDC 事件:更新目標資料表中符合指定索引鍵的任何資料列,或在目標資料表中不存在相符記錄時插入新資料列。 您可以使用 DELETE 條件來指定 APPLY AS DELETE WHEN 事件的處理。

重要

您必須宣告目標串流資料表,才能套用變更。 您可以選擇性地指定目標資料表的結構描述。 指定 APPLY CHANGES 目標資料表的結構描述時,您必須包含 __START_AT 與具有與 __END_AT 欄位相同資料類型的 sequence_by 資料行。

請參閱套用變更 API:使用差異即時資料表簡化異動資料擷取

子句
KEYS

唯一識別來源資料中某資料列的資料行或資料行組合。 這可用來識別哪些 CDC 事件會套用至目標資料表中的特定記錄。

若要定義資料行的組合,請使用以逗號分隔的資料行清單。

此子句是必需的。
IGNORE NULL UPDATES

允許內嵌包含目標資料行子集的更新。 當 CDC 事件符合現有的資料列且指定 IGNORE NULL UPDATES 時,具有 null 的資料行將在目標中保留現有值。 這也適用於值為 null 的巢狀資料行。

這個子句是選用的。

預設會使用 null 值覆寫現有的資料行。
APPLY AS DELETE WHEN

指定應將 CDC 事件視為 DELETE 而非 upsert 的時機。 為了處理順序錯誤的資料,已刪除的資料列會暫時保留為基礎差異資料表中的標記,而檢視會在中繼存放區中建立,以篩選掉這些標記。 您可以使用
pipelines.cdc.tombstoneGCThresholdInSeconds 資料表屬性來設定保留間隔。

這個子句是選用的。
APPLY AS TRUNCATE WHEN

指定應將 CDC 事件視為完整資料表 TRUNCATE 的時機。 因為這個子句會觸發目標資料表的完整截斷,所以應該只用於需要這項功能的特定使用案例。

只有 SCD 類型 1 才支援 APPLY AS TRUNCATE WHEN 子句。 SCD 類型 2 不支援截斷操作。

這個子句是選用的。
SEQUENCE BY

指定來源資料中 CDC 事件邏輯順序的資料行名稱。 差異即時資料表會使用此排序來處理依序抵達的變更事件。

指定的數據行必須是可排序的數據類型。

此子句是必需的。
COLUMNS

指定要包含在目標資料表中的資料行子集。 您可以:

- 指定要包含的資料行完整清單:COLUMNS (userId, name, city)
- 指定要排除的資料行清單:COLUMNS * EXCEPT (operation, sequenceNum)

這個子句是選用的。

預設操作是當未指定 COLUMNS 子句時應包含目標資料表中的所有資料行。
STORED AS

是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。

這個子句是選用的。

預設類型為 SCD 類型 1。
TRACK HISTORY ON

指定輸出資料行的子集,以在這些指定的資料行有任何變更時生成歷史記錄。 您可以:

- 指定要追蹤之資料行的完整清單:COLUMNS (userId, name, city)
- 指定要從追蹤中排除的資料列清單:COLUMNS * EXCEPT (operation, sequenceNum)

這個子句是選用的。 當有任何變更時,預設操作是追蹤所有輸出資料列的歷史記錄,相當於 TRACK HISTORY ON *