建立串流資料表
適用於: Databricks SQL
建立串流資料表,這是一個 Delta 資料表,它額外支援串流或增量資料處理。
串流資料表僅在 Delta Live Tables 和具有 Unity Catalog 的 Databricks SQL 中受支援。 在受支援的 Databricks Runtime 計算上執行此命令只會剖析語法。 請參閱 使用 SQL 開發管線程式代碼。
語法
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
參數
REFRESH
如果已指定,請使用查詢中所定義來源中可用的最新資料重新整理資料表。 只會處理在查詢開始之前到達的新資料。 在命令執行期間新增至來源的新資料會被忽略,直到下一次重新整理為止。 CREATE OR REFRESH 的重新整理操作是完全宣告式的。 如果重新整理命令未指定原始資料表建立陳述式中的所有中繼資料,則會刪除未指定的中繼資料。
IF NOT EXISTS
如果串流資料表不存在,則會建立。 如果此名稱的資料表已經存在,則會忽略
CREATE STREAMING TABLE
陳述式。您最多可以指定
IF NOT EXISTS
或OR REFRESH
中的一個。-
要建立的資料表的名稱。 名稱不得包含 時態規格或選項規格。 如果名稱不合格,則會在目前的結構描述中建立資料表。
table_specification
此選擇性子句會定義資料行清單、其類型、屬性、描述和資料行條件約束。
如果您未定義資料表結構描述的資料列,則必須指定
AS query
。-
資料行的唯一名稱。
-
指定資料行的資料類型。
NOT NULL
如果已指定,則資料行不接受
NULL
值。COMMENT column_comment
描述資料行的字串常值。
-
重要
這項功能處於公開預覽狀態。
將主索引鍵或外部索引鍵條件約束加入串流資料表中的資料行。
hive_metastore
目錄中的資料表不支持條件約束。 -
重要
這項功能處於公開預覽狀態。
新增資料行 mask 函式來匿名敏感性資料。 該資料行的所有後續查詢都會收到針對資料行中該函式的評估結果,而不是該資料行的原始值。 這適用於更細緻的存取控制,其中函式可以檢查叫用使用者的身分識別和/或群組成員資格,以決定是否要修訂該值。
CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
將資料品質預期加入資料表。 可以隨時間追蹤這些資料品質預期,並透過串流資料表的事件記錄檔來存取。
FAIL UPDATE
預期會導致在建立資料表以及重新整理資料表時,處理失敗。 如果不符合預期,DROP ROW
預期會導致卸除整個資料列。expectation_expr
可能由常值、資料表內的資料行識別碼以及確定性內建 SQL 函式或運算子組成,但下列函式除外:expr
也不得包含任何子查詢。-
重要
這項功能處於公開預覽狀態。
將資訊主索引鍵或資訊外部索引鍵條件約束加入串流資料表。
hive_metastore
目錄中的資料表不支援索引鍵條件約束。
-
-
table_clauses
選擇性地指定新資料表的資料分割、註解、使用者定義的屬性以及重新整理排程。 每個次子句只能指定一次。
-
資料表資料行的可選清單,據此分割資料表。
COMMENT table_comment
用於描述資料表的
STRING
常值。-
選擇性地設定一個或多個使用者定義的屬性。
使用此設定來指定用來執行此語句的 Delta Live Tables 執行時間通道。 將屬性的值
pipelines.channel
設定為"PREVIEW"
或"CURRENT"
。 預設值是"CURRENT"
。 如需 Delta Live Tables 通道的詳細資訊,請參閱 Delta Live Tables 運行時間通道。 SCHEDULE [ REFRESH ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
重要
這項功能處於公開預覽狀態。
若要排程定期發生的重新整理,請使用
EVERY
語法。 如果EVERY
指定語法,則資料串流資料表或具體化檢視會根據所提供的值定期重新整理指定的間隔,例如HOUR
、HOURS
、DAY
、DAYS
、WEEK
或WEEKS
。 下表列出 接受的number
整數值。Time unit 整數值 HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 注意
內含時間單位的單數和複數形式在語意上相等。
CRON cron_string [ AT TIME ZONE timezone_id ]
使用晶體 cron 值來排程重新整理。 接受有效的 time_zone_values 。 不支援
AT TIME ZONE LOCAL
。如果
AT TIME ZONE
不存在,則會使用工作階段時區。 如果AT TIME ZONE
不存在且未設定工作階段時區,則會擲回錯誤。SCHEDULE
在語意上相當於SCHEDULE REFRESH
。
排程可作為
CREATE
命令的一部分提供。 使用 ALTER STREAMING TABLE 或執行具有SCHEDULE
子句的CREATE OR REFRESH
命令,在建立之後改變串流資料表的排程。WITH ROW FILTER 子句
重要
這項功能處於公開預覽狀態。
將資料列篩選函數新增至資料表。 該資料表的所有後續查詢都會接收函式評估為布爾 TRUE 的資料列子集。 這適用於更細緻的存取控制,其中函式可以檢查叫用使用者的身分識別或群組成員資格,以決定是否要篩選某些資料列。
-
AS 查詢
此子句會使用
query
中的資料填入資料表。 此查詢必須是串流查詢。 可以將STREAM
關鍵字新增至您想要累加處理的任何關聯,以達成此目的。 當您一起指定query
和table_specification
時,table_specification
中指定的資料表結構描述必須包含query
傳回的所有資料行,否則您會收到錯誤。 在table_specification
中指定但query
未傳回的任何資料行在查詢時都會傳回null
值。
串流資料表與其他資料表之間的差異
串流資料表是具狀態的資料表,其設計目的是在您處理不斷增長的資料集時,只處理每個資料列一次。 由於大部分資料集會隨著時間持續增長,因此串流資料表適合大部分擷取工作負載。 串流資料表最適合需要資料新鮮度和低延遲的管線。 串流資料表也可用於大規模轉換,因為可以在新資料到達時以累加方式計算結果,讓結果保持最新狀態,而不需要在每次更新時完全重新計算所有來源資料。 串流資料表是針對僅附加的資料來源而設計。
串流資料表會接受其他命令,例如 REFRESH
,它會處理查詢中所提供來源中可用的最新資料。 對提供的查詢進行的變更只會透過呼叫 REFRESH
反映在新資料上,而不是先前處理過的資料。 若也要對現有資料套用變更,需要執行 REFRESH TABLE <table_name> FULL
以執行 FULL REFRESH
。 完全重新整理會以最新的定義重新處理來源中所有可用的資料。 不建議在不保留整個資料歷程記錄或保留期間很短的來源 (例如 Kafka) 上呼叫完全重新整理,因為完全重新整理會截斷現有資料。 如果資料來源中的資料不再可供使用,您可能無法復原舊資料。
資料列篩選器和資料行遮罩
重要
這項功能處於公開預覽狀態。
每當資料表掃描擷取資料列時,資料列篩選可讓您指定套用為篩選條件的函式。 這些篩選條件可確保後續查詢只會傳回篩選條件述詞評估為 true 的資料列。
每當資料表掃描擷取資料列時,資料行遮罩可讓您遮罩資料行的值。 涉及該資料行的所有未來查詢都會收到針對資料行評估該函式的結果,而不是取代該資料行的原始值。
如需有關如何使用資料列篩選和資料行遮罩的詳細資訊,請參閱使用資料列篩選和資料行遮罩篩選敏感資料表資料。
管理資料列篩選和資料行遮罩
應該透過 CREATE OR REFRESH
陳述式來新增、更新或卸除串流資料表上的資料列篩選和資料行遮罩。
行為
- 以定義者身分重新整理:當
CREATE OR REFRESH
或REFRESH
陳述式重新整理串流資料表時,資料列篩選函式會以定義者的權限執行 (作為資料表擁有者)。 這表示資料表重新整理會使用建立串流資料表之使用者的安全性內容。 - 查詢:雖然大部分篩選都會以定義者的權限執行,但檢查使用者內容的函式 (例如
CURRENT_USER
和IS_MEMBER
) 是例外狀況。 會以叫用者的身分執行這些函式。 此方法會根據目前使用者的內容強制執行使用者特定的資料安全性和存取控制。
可檢視性
使用 DESCRIBE EXTENDED
、INFORMATION_SCHEMA
或目錄總管來檢查套用至指定串流資料表的現有資料列篩選和資料行遮罩。 此功能可讓使用者稽核及檢閱串流資料表上的資料存取和保護措施。
限制
只有資料表擁有者才能重新整理串流資料表以取得最新資料。
串流資料表不允許
ALTER TABLE
命令。 應該透過CREATE OR REFRESH
或 ALTER STREAMING TABLE 陳述式來改變資料表的定義和屬性。不支援時間旅行查詢。
不支援透過
INSERT INTO
和MERGE
之類的 DML 命令來演進資料表結構描述。串流資料表不支援下列命令:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
不支援 Delta Sharing。
不支援重新命名資料表或變更擁有者。
不支援諸如
PRIMARY KEY
和FOREIGN KEY
的資料表條件約束。不支援生成的資料行、識別資料行和預設資料行。
範例
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')