CREATE STREAMING TABLE
適用於: Databricks SQL
建立
串流表格只支援在 Delta Live Tables 和具有 Unity Catalog 的 Databricks SQL 上使用。 在受支援的 Databricks Runtime 計算上執行此命令只會剖析語法。 請參閱 使用 SQL 開發管線程式代碼。
語法
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
參數
REFRESH
如果有指定,請用查詢中所定義來源的最新可用數據重新整理表格。 只會處理在查詢開始之前到達的新資料。 在命令執行期間,會忽略新增至來源的新數據,直到下一次重新整理為止。 CREATE OR REFRESH 的重新整理作業是完全宣告式的。 如果重新整理命令未指定原始數據表建立語句中的所有元數據,則會刪除未指定的元數據。
IF NOT EXISTS
若串流數據表不存在,則建立它。 如果這個名稱的數據表已經存在,則會忽略
CREATE STREAMING TABLE
語句。您最多可以指定
IF NOT EXISTS
或OR REFRESH
中的一個。-
要建立之數據表的名稱。 名稱不得包含 時態規格或選項規格。 如果名稱未經完整限定,將在當前的模式中建立表。
table_specification
這個選擇性子句會定義欄位清單、其類型、屬性、描述和欄位條件約束。
如果您未在資料表架構中定義資料列,則必須指定
AS query
。-
欄的唯一名稱。
-
指定資料行 的資料類型。
NOT NULL
如果已指定,則欄不接受
NULL
值。COMMENT column_comment
描述欄的字串常值。
-
重要
這項功能處於公開預覽狀態。
將主鍵或外鍵條件約束加入串流數據表中的數據行。
hive_metastore
目錄中的數據表不支持條件約束。 -
重要
這項功能處於公開預覽狀態。
新增數據行遮罩函式來匿名敏感數據。 該數據欄的所有後續查詢都會接收到將該函式應用於數據欄後的結果,以取代數據欄的原始值。 這對於精細訪問控制用途很有用,其中函式可以檢查叫用使用者的身分識別或群組成員資格,以決定是否要編輯或刪除值。
CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE |DROP ROW } ]
將數據品質期望加入表格。 這些數據質量預期可以隨著時間追蹤,並透過串流數據表 事件記錄檔來存取。
FAIL UPDATE
預期會導致在建立數據表以及重新整理數據表時,處理失敗。 如果不符合預期,DROP ROW
預期會導致卸除整個資料列。expectation_expr
可能包含常值、數據表中的數據行標識符,以及決定性的內建 SQL 函式或運算符,但除外:expr
也不得包含任何子查詢。-
重要
這項功能處於公開預覽狀態。
將資訊型主鍵或資訊型外鍵約束加入串流資料表。
hive_metastore
目錄中的資料表不支援主鍵約束。
-
-
table_clauses
選擇性地指定新數據表的數據分割、批注、使用者定義屬性,以及重新整理排程。 每個次子句只能指定一次。
-
用來對表格進行分區的可選欄位列表。
COMMENT table_comment
描述資料表的
STRING
字面值。-
選擇性地設定一個或多個使用者定義的屬性。
使用此設定來指定用來執行此語句的 Delta Live Tables 執行時間通道。 將
pipelines.channel
屬性的值設定為"PREVIEW"
或"CURRENT"
。 預設值是"CURRENT"
。 如需 Delta Live Tables 頻道的詳細資訊,請參閱 Delta Live Tables 執行環境頻道。 時間表 [ REFRESH ] 時間表條款
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
若要排程定期發生的重新整理,請使用
EVERY
語法。 如果指定了EVERY
語法格式,串流資料表或具象化檢視表將根據所提供的值在指定的間隔定期更新,例如HOUR
、HOURS
、DAY
、DAYS
、WEEK
或WEEKS
。 下表列出number
接受的整數值。Time unit 整數值 HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 注意
內含時間單位的單數和複數形式在語意上相等。
CRON cron_string [ AT TIME ZONE timezone_id ]
使用 quartz cron 值來排程重新整理。 接受有效的 time_zone_values 。 不支援
AT TIME ZONE LOCAL
。如果
AT TIME ZONE
不存在,則會使用工作階段時區。 如果AT TIME ZONE
不存在且未設定會話時區,則會擲回錯誤。SCHEDULE
在語意上相當於SCHEDULE REFRESH
。
排程可作為
CREATE
命令的一部分提供。 使用 ALTER STREAMING TABLE 或搭配CREATE OR REFRESH
子句執行SCHEDULE
命令,以在建立之後改變串流數據表的排程。-
-
重要
這項功能處於公開預覽狀態。
將數據列篩選函式加入至數據表。 該數據表中所有後續的查詢都會接收到函式評估為布林值 TRUE 的行子集。 這對於精細訪問控制用途很有用,其中函式可以檢查叫用使用者的身分識別或群組成員資格,以決定是否要篩選特定數據列。
AS 查詢
這個子句會使用 來自
query
的數據填入數據表。 此查詢必須是串流查詢。 可以將STREAM
關鍵字新增至您想要累加處理的任何關聯,以達成此目的。 當您一起指定query
和table_specification
時,table_specification
中指定的數據表架構必須包含query
傳回的所有數據行,否則您會收到錯誤。table_specification
中指定的任何欄,如果在query
中未被返回,則在查詢時會返回null
值。
串流數據表與其他數據表之間的差異
串流數據表是具狀態的數據表,其設計目的是在您處理成長的數據集時,只處理每個數據列一次。 由於大部分數據集會隨著時間持續成長,串流數據表最適合大部分的擷取工作負載。 串流數據表最適合需要數據新鮮度和低延遲的管線。 串流數據表也可用於大規模轉換,因為結果可以在新數據送達時以累加方式計算,讓結果保持在最新狀態,而不需要使用每個更新完全重新計算所有源數據。 串流數據表是針對僅附加的數據源所設計。
串流數據表接受其他命令,例如 REFRESH
,其會處理查詢中提供的來源中可用的最新數據。 提供的查詢變更只會藉由呼叫 REFRESH
來反映新數據,而不是先前處理過的數據。 若也要對現有資料套用變更,需要執行 REFRESH TABLE <table_name> FULL
以執行 FULL REFRESH
。 完全重新整理會以最新的定義重新處理來源中所有可用的資料。 不建議對那些不保留完整數據歷史或保留時間較短的來源(例如 Kafka)進行完整重新整理,因為完整重新整理會截斷現有的數據。 如果資料來源中的資料不再可供使用,您可能無法復原舊資料。
行篩選和列遮罩
重要
這項功能處於公開預覽狀態。
資料列篩選可讓您指定一個函式,該函式在資料表掃描取出資料列時作為篩選器應用。 這些篩選條件可確保後續查詢只會傳回篩選條件述詞評估為 true 的資料列。
每當數據表掃描擷取數據列時,數據行遮罩可讓您遮罩數據行的值。 涉及該數據行的所有未來查詢都會收到評估數據行函式的結果,並取代數據行的原始值。
如需如何使用數據列篩選和數據行遮罩的詳細資訊,請參閱 使用數據列篩選和數據行遮罩篩選敏感數據。
管理行篩選和列遮罩
串流數據表上的數據列篩選和數據行遮罩應該透過 CREATE OR REFRESH
語句新增、更新或卸除。
行為
-
Refresh as Definer:當
CREATE OR REFRESH
或REFRESH
語句重新整理串流數據表時,數據列篩選函式會以定義者的許可權執行(以數據表擁有者身分)。 這表示資料表重新整理會使用建立串流資料表之使用者的安全性環境。 -
查詢:雖然大部分篩選都會以定義者的權限執行,但檢查使用者內容的函式 (例如
CURRENT_USER
和IS_MEMBER
) 是例外狀況。 會以叫用者的身分執行這些函式。 此方法會根據目前使用者的內容強制執行使用者特定的資料安全性和存取控制。
可檢視性
使用 DESCRIBE EXTENDED
、INFORMATION_SCHEMA
或目錄總管來檢查套用至指定串流數據表的現有數據列篩選和數據行遮罩。 此功能可讓使用者稽核及檢閱串流數據表上的數據存取和保護措施。
限制
- 只有數據表擁有者可以重新整理串流數據表,以取得最新的數據。
- 串流數據表上不允許
ALTER TABLE
命令。 數據表的定義和屬性應該透過CREATE OR REFRESH
或 ALTER STREAMING TABLE 語句來改變。 - 不支援透過像
INSERT INTO
和MERGE
這樣的 DML 命令來修改資料表結構。 - 串流資料表不支援下列命令:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
- 不支援 Delta Sharing。
- 不支援重新命名資料表或變更擁有者。
- 不支持數據表條件約束,例如
PRIMARY KEY
和FOREIGN KEY
。 - 不支援產生的數據行、識別數據行和預設數據行。
範例
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')