共用方式為


建立串流資料表

適用於:核取記號為「是」 Databricks SQL

建立串流資料表,這是一個 Delta 資料表,它額外支援串流或增量資料處理。

串流資料表僅在 Delta Live Tables 和具有 Unity Catalog 的 Databricks SQL 中受支援。 在受支援的 Databricks Runtime 計算上執行此命令只會剖析語法。 請參閱 使用 SQL 開發管線程式代碼。

語法

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

參數

  • REFRESH

    如果已指定,請使用查詢中所定義來源中可用的最新資料重新整理資料表。 只會處理在查詢開始之前到達的新資料。 在命令執行期間新增至來源的新資料會被忽略,直到下一次重新整理為止。 CREATE OR REFRESH 的重新整理操作是完全宣告式的。 如果重新整理命令未指定原始資料表建立陳述式中的所有中繼資料,則會刪除未指定的中繼資料。

  • IF NOT EXISTS

    如果串流資料表不存在,則會建立。 如果此名稱的資料表已經存在,則會忽略 CREATE STREAMING TABLE 陳述式。

    您最多可以指定 IF NOT EXISTSOR REFRESH 中的一個。

  • table_name

    要建立的資料表的名稱。 名稱不得包含 時態規格或選項規格。 如果名稱不合格,則會在目前的結構描述中建立資料表。

  • table_specification

    此選擇性子句會定義資料行清單、其類型、屬性、描述和資料行條件約束。

    如果您未定義資料表結構描述的資料列,則必須指定 AS query

    • column_identifier

      資料行的唯一名稱。

      • column_type

        指定資料行的資料類型

      • NOT NULL

        如果已指定,則資料行不接受 NULL 值。

      • COMMENT column_comment

        描述資料行的字串常值。

      • column_constraint

        重要

        這項功能處於公開預覽狀態

        將主索引鍵或外部索引鍵條件約束加入串流資料表中的資料行。 hive_metastore 目錄中的資料表不支持條件約束。

      • MASK 子句

        重要

        這項功能處於公開預覽狀態

        新增資料行 mask 函式來匿名敏感性資料。 該資料行的所有後續查詢都會收到針對資料行中該函式的評估結果,而不是該資料行的原始值。 這適用於更細緻的存取控制,其中函式可以檢查叫用使用者的身分識別和/或群組成員資格,以決定是否要修訂該值。

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        將資料品質預期加入資料表。 可以隨時間追蹤這些資料品質預期,並透過串流資料表的事件記錄檔來存取。 FAIL UPDATE 預期會導致在建立資料表以及重新整理資料表時,處理失敗。 如果不符合預期,DROP ROW 預期會導致卸除整個資料列。

        expectation_expr 可能由常值、資料表內的資料行識別碼以及確定性內建 SQL 函式或運算子組成,但下列函式除外:

        expr 也不得包含任何子查詢

      • table_constraint

        重要

        這項功能處於公開預覽狀態

        將資訊主索引鍵或資訊外部索引鍵條件約束加入串流資料表。 hive_metastore 目錄中的資料表不支援索引鍵條件約束。

  • table_clauses

    選擇性地指定新資料表的資料分割、註解、使用者定義的屬性以及重新整理排程。 每個次子句只能指定一次。

    • PARTITIONED BY

      資料表資料行的可選清單,據此分割資料表。

    • COMMENT table_comment

      用於描述資料表的 STRING 常值。

    • TBLPROPERTIES

      選擇性地設定一個或多個使用者定義的屬性。

      使用此設定來指定用來執行此語句的 Delta Live Tables 執行時間通道。 將屬性的值 pipelines.channel 設定為 "PREVIEW""CURRENT"。 預設值是 "CURRENT"。 如需 Delta Live Tables 通道的詳細資訊,請參閱 Delta Live Tables 運行時間通道

    • SCHEDULE [ REFRESH ] schedule_clause

      • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

        重要

        這項功能處於公開預覽狀態

        若要排程定期發生的重新整理,請使用 EVERY 語法。 如果 EVERY 指定語法,則資料串流資料表或具體化檢視會根據所提供的值定期重新整理指定的間隔,例如 HOURHOURSDAYDAYSWEEKWEEKS。 下表列出 接受的 number整數值。

        Time unit 整數值
        HOUR or HOURS 1 <= H <= 72
        DAY or DAYS 1 <= D <= 31
        WEEK or WEEKS 1 <= W <= 8

        注意

        內含時間單位的單數和複數形式在語意上相等。

      • CRON cron_string [ AT TIME ZONE timezone_id ]

        使用晶體 cron 值來排程重新整理。 接受有效的 time_zone_values 。 不支援 AT TIME ZONE LOCAL

        如果 AT TIME ZONE 不存在,則會使用工作階段時區。 如果 AT TIME ZONE 不存在且未設定工作階段時區,則會擲回錯誤。 SCHEDULE 在語意上相當於 SCHEDULE REFRESH

      排程可作為 CREATE 命令的一部分提供。 使用 ALTER STREAMING TABLE 或執行具有 SCHEDULE 子句的 CREATE OR REFRESH 命令,在建立之後改變串流資料表的排程。

    • WITH ROW FILTER 子句

      重要

      這項功能處於公開預覽狀態

      將資料列篩選函數新增至資料表。 該資料表的所有後續查詢都會接收函式評估為布爾 TRUE 的資料列子集。 這適用於更細緻的存取控制,其中函式可以檢查叫用使用者的身分識別或群組成員資格,以決定是否要篩選某些資料列。

  • AS 查詢

    此子句會使用 query 中的資料填入資料表。 此查詢必須是串流查詢。 可以將 STREAM 關鍵字新增至您想要累加處理的任何關聯,以達成此目的。 當您一起指定 querytable_specification 時,table_specification 中指定的資料表結構描述必須包含 query 傳回的所有資料行,否則您會收到錯誤。 在 table_specification 中指定但 query 未傳回的任何資料行在查詢時都會傳回 null 值。

串流資料表與其他資料表之間的差異

串流資料表是具狀態的資料表,其設計目的是在您處理不斷增長的資料集時,只處理每個資料列一次。 由於大部分資料集會隨著時間持續增長,因此串流資料表適合大部分擷取工作負載。 串流資料表最適合需要資料新鮮度和低延遲的管線。 串流資料表也可用於大規模轉換,因為可以在新資料到達時以累加方式計算結果,讓結果保持最新狀態,而不需要在每次更新時完全重新計算所有來源資料。 串流資料表是針對僅附加的資料來源而設計。

串流資料表會接受其他命令,例如 REFRESH,它會處理查詢中所提供來源中可用的最新資料。 對提供的查詢進行的變更只會透過呼叫 REFRESH 反映在新資料上,而不是先前處理過的資料。 若也要對現有資料套用變更,需要執行 REFRESH TABLE <table_name> FULL 以執行 FULL REFRESH。 完全重新整理會以最新的定義重新處理來源中所有可用的資料。 不建議在不保留整個資料歷程記錄或保留期間很短的來源 (例如 Kafka) 上呼叫完全重新整理,因為完全重新整理會截斷現有資料。 如果資料來源中的資料不再可供使用,您可能無法復原舊資料。

資料列篩選器和資料行遮罩

重要

這項功能處於公開預覽狀態

每當資料表掃描擷取資料列時,資料列篩選可讓您指定套用為篩選條件的函式。 這些篩選條件可確保後續查詢只會傳回篩選條件述詞評估為 true 的資料列。

每當資料表掃描擷取資料列時,資料行遮罩可讓您遮罩資料行的值。 涉及該資料行的所有未來查詢都會收到針對資料行評估該函式的結果,而不是取代該資料行的原始值。

如需有關如何使用資料列篩選和資料行遮罩的詳細資訊,請參閱使用資料列篩選和資料行遮罩篩選敏感資料表資料

管理資料列篩選和資料行遮罩

應該透過 CREATE OR REFRESH 陳述式來新增、更新或卸除串流資料表上的資料列篩選和資料行遮罩。

行為

  • 以定義者身分重新整理:當 CREATE OR REFRESHREFRESH 陳述式重新整理串流資料表時,資料列篩選函式會以定義者的權限執行 (作為資料表擁有者)。 這表示資料表重新整理會使用建立串流資料表之使用者的安全性內容。
  • 查詢:雖然大部分篩選都會以定義者的權限執行,但檢查使用者內容的函式 (例如 CURRENT_USERIS_MEMBER) 是例外狀況。 會以叫用者的身分執行這些函式。 此方法會根據目前使用者的內容強制執行使用者特定的資料安全性和存取控制。

可檢視性

使用 DESCRIBE EXTENDEDINFORMATION_SCHEMA 或目錄總管來檢查套用至指定串流資料表的現有資料列篩選和資料行遮罩。 此功能可讓使用者稽核及檢閱串流資料表上的資料存取和保護措施。

限制

  • 只有資料表擁有者才能重新整理串流資料表以取得最新資料。

  • 串流資料表不允許 ALTER TABLE 命令。 應該透過 CREATE OR REFRESHALTER STREAMING TABLE 陳述式來改變資料表的定義和屬性。

  • 不支援時間旅行查詢。

  • 不支援透過 INSERT INTOMERGE 之類的 DML 命令來演進資料表結構描述。

  • 串流資料表不支援下列命令:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • 不支援 Delta Sharing。

  • 不支援重新命名資料表或變更擁有者。

  • 不支援諸如 PRIMARY KEYFOREIGN KEY 的資料表條件約束。

  • 不支援生成的資料行、識別資料行和預設資料行。

範例

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')