共用方式為


使用 SQL 開發管線程式代碼

Delta Live Tables 引進數個新的 SQL 關鍵詞和函式,用於定義管線中具體化檢視和串流數據表。 開發管線的 SQL 支援是以 Spark SQL 的基本概念為基礎,並新增對結構化串流功能的支援。

熟悉 PySpark DataFrame 的使用者可能會偏好使用 Python 開發管線程序代碼。 Python 支援更廣泛的測試和作業,這些測試與作業難以搭配 SQL 實作,例如中繼程式設計作業。 請參閱 使用 Python 開發管線程式代碼。

如需 Delta Live Tables SQL 語法的完整參考,請參閱 Delta Live Tables SQL 語言參考

用於管線開發的 SQL 基本概念

建立 Delta Live Tables 數據集的 SQL 程式代碼會 CREATE OR REFRESH 使用 語法,針對查詢結果定義具體化檢視和串流數據表。

STREAM關鍵詞會指出是否應該使用串流語意來讀取 子句中所SELECT參考的數據源。

Delta Live Tables 原始碼與 SQL 腳本大相徑庭:Delta Live Tables 會評估管線中設定之所有原始程式碼檔案的所有數據集定義,並在執行任何查詢之前建置數據流圖形。 出現在筆記本或腳本中的查詢順序不會定義執行順序。

使用 SQL 建立具體化檢視

下列程式代碼範例示範使用 SQL 建立具體化檢視的基本語法:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

使用 SQL 建立串流數據表

下列程式代碼範例示範使用 SQL 建立串流資料表的基本語法:

注意

並非所有數據源都支援串流讀取,而且某些數據源一律應該使用串流語意來處理。

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

從物件記憶體載入資料

Delta Live Tables 支援從 Azure Databricks 支援的所有格式載入資料。 請參閱資料格式選項 (機器翻譯)。

注意

這些範例會使用自動掛接至工作區底下 /databricks-datasets 可用的數據。 Databricks 建議使用磁碟區路徑或雲端 URI 來參考儲存在雲端物件記憶體中的數據。 請參閱Unity Catalog 磁碟區是什麼?

Databricks 建議針對儲存在雲端物件記憶體中的數據設定累加擷取工作負載時,使用自動載入器和串流數據表。 請參閱 什麼是自動載入器?

SQL 會使用函 read_files 式來叫用自動載入器功能。 您也必須使用 STREAM 關鍵詞來設定使用 read_files讀取的串流。

下列範例會使用自動載入器從 JSON 檔案建立串流資料表:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

read_files 式也支援批次語意來建立具體化檢視。 下列範例會使用批次語意來讀取 JSON 目錄,並建立具體化檢視:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

使用預期驗證數據

您可以使用預期來設定及強制執行資料品質條件約束。 請參閱使用 Delta Live Tables 管理資料品質 (機器翻譯)。

下列程式代碼定義名為 valid_data 的預期,會在數據擷取期間卸除 Null 的記錄:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

查詢管線中定義的具體化檢視和串流數據表

LIVE使用架構來查詢管線中定義的其他具體化檢視和串流數據表。

下列範例會定義四個資料集:

  • 名為 orders 的串流數據表,會載入 JSON 數據。
  • 名為 customers 的具體化檢視,會載入 CSV 數據。
  • 名為 customer_orders 的具體化檢視,會聯結 和 customers 數據集中的orders記錄、將順序時間戳轉換成日期,然後選取 customer_idorder_numberstateorder_date 字段。
  • daily_orders_by_state 名的具體化檢視,會匯總每個狀態的每日訂單計數。
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;