CREATE STREAMING TABLE
適用対象: Databricks SQL
ストリーミングまたは増分データ処理を追加でサポートする Delta テーブルである "ストリーミング テーブル" を作成します。
ストリーミング テーブルは、Delta Live Tables と Databricks SQL (と Unity Catalog) でのみサポートされます。 サポートされている Databricks Runtime コンピューティングでこのコマンドを実行すると、構文のみが解析されます。 SQL Develop パイプライン コードを参照してください。
構文
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
パラメーター
REFRESH
指定した場合、クエリで定義されているソースから利用できる最新のデータでテーブルが更新されます。 クエリが開始される前に到着した新しいデータのみが処理されます。 コマンドの実行中にソースに追加される新しいデータは次の更新まで無視されます。 CREATE OR REFRESH からの更新操作は完全に宣言型です。 更新コマンドで元のテーブル作成ステートメントのすべてのメタデータが指定されていない場合、指定されていないメタデータは削除されます。
IF NOT EXISTS
ストリーミング テーブルが存在しない場合は作成します。 この名前のビューが既に存在する場合、
CREATE STREAMING TABLE
ステートメントは無視されます。IF NOT EXISTS
かOR REFRESH
のいずれか 1 つだけを指定できます。-
作成されるテーブルの名前。 名前には、 時仕様またはオプション指定を含めてはなりません。 名前が修飾されていない場合、テーブルは現在のスキーマに作成されます。
table_specification
この省略可能な句で、列、その型、プロパティ、説明、および列制約の一覧を定義します。
テーブル スキーマで列を定義しない場合、
AS query
を指定する必要があります。-
列の一意の名前。
-
列のデータ型を指定します。
NOT NULL
指定した場合、列は
NULL
値を受け取りません。COMMENT column_comment
列について説明する文字列リテラル。
-
重要
この機能はパブリック プレビュー段階にあります。
ストリーミング テーブル内の列に主キーまたは外部キー制約を追加します。 制約は、
hive_metastore
カタログ内のテーブルではサポートされていません。 -
重要
この機能はパブリック プレビュー段階にあります。
列マスク関数を追加して、機密データを匿名化します。 その列からの後続のすべてのクエリは、列の元の値の代わりに、列に対してその関数の評価結果を受け取ります。 これは、関数が呼び出し元ユーザーの ID またはグループ メンバーシップを検査して、値を編集するかどうかを決定できる、きめ細かいアクセス制御に役立ちます。
CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
テーブルにデータ品質の期待値を追加します。 このデータ品質の期待値は、一定期間追跡し、ストリーミング テーブルのイベント ログを介してアクセスできます。 テーブルの作成時とテーブル更新時の両方で、
FAIL UPDATE
期待値により処理が失敗します。DROP ROW
期待値が満たされない場合、行全体が削除されます。expectation_expr
は、以下のものを除く、リテラル、テーブル内の列識別子、および決定論的な組み込みの SQL 関数または演算子で構成される場合があります。また
expr
には、サブクエリを含めることはできません。-
重要
この機能はパブリック プレビュー段階にあります。
情報主キーまたは情報外部キーの制約をストリーミング テーブルに追加します。 主な制約は、
hive_metastore
カタログ内のテーブルに対してはサポートされません。
-
-
table_clauses
必要に応じて、パーティション分割、コメント、ユーザー定義プロパティ、新しいテーブルの更新スケジュールを指定します。 各サブ句は、1 回だけ指定できます。
-
テーブルをパーティション分割するための、テーブルの列の省略可能な一覧。
COMMENT table_comment
テーブルについて説明する
STRING
リテラル。-
必要に応じて、1 つ以上のユーザー定義プロパティを設定します。
この設定を使用して、このステートメントの実行に使用する Delta Live Tables ランタイム チャネルを指定します。
pipelines.channel
プロパティの値を"PREVIEW"
または"CURRENT"
に設定します。 既定値は"CURRENT"
です。 Delta Live Tables チャネルの詳細については、「 Delta Live Tables ランタイム チャネルを参照してください。 SCHEDULE [ REFRESH ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
重要
この機能はパブリック プレビュー段階にあります。
定期的に更新をスケジュールするには、
EVERY
構文を使用します。EVERY
構文が指定されている場合、ストリーミング テーブルまたは具体化されたビューは、指定された値 (HOUR
、HOURS
、DAY
、DAYS
、WEEK
、WEEKS
など) に基づいて、指定した間隔で定期的に更新されます。 次の表に、number
に使用できる整数値を示します。Time unit 整数値 HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 Note
含まれる時間単位の単数形と複数形は、意味的に同等です。
CRON cron_string [ AT TIME ZONE timezone_id ]
quartz cron 値を使用して更新をスケジュールします。 有効な time_zone_values が受け入れられます。
AT TIME ZONE LOCAL
はサポートされません。AT TIME ZONE
が存在しない場合は、セッション タイム ゾーンが使用されます。AT TIME ZONE
が存在せず、セッション タイム ゾーンも設定されていない場合は、エラーがスローされます。SCHEDULE
は意味的にSCHEDULE REFRESH
と同等です。
スケジュールは
CREATE
コマンドの一部として指定できます。 作成後に、ストリーミング テーブルのスケジュールを変更するには ALTER STREAMING TABLE またはCREATE OR REFRESH
コマンドをSCHEDULE
句を指定して実行します。WITH ROW FILTER 句
重要
この機能はパブリック プレビュー段階にあります。
行フィルター関数をテーブルに追加します。 そのテーブルからの後続のすべてのクエリは、関数がブール値 TRUE に評価する行のサブセットを受け取ります。 これは、関数が呼び出したユーザーの ID またはグループ メンバーシップを検査して、特定の行をフィルター処理するかどうかを決定できる、きめ細かいアクセス制御に役立ちます。
-
AS クエリ
この句により、
query
からデータがテーブルに入力されます。 このクエリはストリーミング クエリにする必要があります。 そのためには増分的に処理するリレーションにSTREAM
キーワードを追加します。query
とtable_specification
を一緒に指定するとき、table_specification
に指定されているテーブル スキーマに、query
から返される列をすべて含める必要があります。含まれていない場合、エラーが出ます。table_specification
で指定されているが、query
から返されない列はクエリ時にnull
値を返します。
ストリーミング テーブルと他のテーブルの違い
ストリーミング テーブルはステートフル テーブルであり、増加するデータセットを処理するときに各行を 1 回だけ処理するように設計されています。 ほとんどのデータセットは時間が経過するにつれて増大し続けるため、ストリーミング テーブルは、大半のインジェスト ワークロードに適しています。 ストリーミング テーブルは、データの鮮度と待ち時間の短さが要求されるパイプラインに最適です。 また、非常に大規模な変換を行う用途にも適しています。これは、新しいデータが入ってくるのに応じて増分方式で結果を計算し続けて最新の状態に保つことができ、更新のたびにソース データ全体を再計算する必要がないためです。 ストリーミング テーブルは追加専用のデータ ソースを想定して設計されています。
ストリーミング テーブルは、REFRESH
などの追加コマンドを受け取ります。このコマンドは、クエリで提供されるソースで利用できる最新のデータを処理します。 指定されたクエリに対する変更は、以前に処理されたデータではなく、REFRESH
を呼び出すことによって新しいデータにのみ反映されます。 既存のデータにも変更を適用するには、FULL REFRESH
を実行するために REFRESH TABLE <table_name> FULL
を実行する必要があります。 完全更新では、最新の定義を使用して、ソースで使用可能なすべてのデータが再処理されます。 完全更新では既存のデータが切り詰められるため、データの履歴全体を保持しないソースや、Kafka など、保持期間が短いソースの場合、完全更新の呼び出しは推奨されません。 ソースでデータが使用できなくなった場合、古いデータを回復できないことがあります。
行フィルターと列マスク
重要
この機能はパブリック プレビュー段階にあります。
行フィルターを使用すると、テーブル スキャンで行がフェッチされるたびにフィルターとして適用される関数を指定できます。 これらのフィルターにより、後続のクエリでフィルター述語が true と評価される行のみが返されるようになります。
列マスクを使用すると、テーブル スキャンで行がフェッチされるたびに列の値をマスクできます。 その列に関連する今後のすべてのクエリでは、列の元の値を置き換えて、列に対してその関数を評価した結果が返されます。
行フィルターと列マスクの使用方法の詳細については、「行フィルターと列マスクを使って機密性の高いテーブル データをフィルター処理する」を参照してください。
行フィルターと列マスクの管理
ストリーミング テーブルの行フィルターと列マスクは、CREATE OR REFRESH
ステートメントを通じて追加、更新、または削除する必要があります。
Behavior
- 定義者として更新:
CREATE OR REFRESH
またはREFRESH
ステートメントがストリーミング テーブルを更新すると、行フィルター関数は定義者の権限で (テーブル所有者として) 実行されます。 つまり、テーブルの更新では、ストリーミング テーブルを作成したユーザーのセキュリティ コンテキストが使用されます。 - クエリ: ほとんどのフィルターは定義者の権限で実行されますが、ユーザー コンテキストをチェックする関数 (
CURRENT_USER
やIS_MEMBER
など) は例外です。 これらの関数は呼び出し元として実行されます。 このアプローチでは、現在のユーザーのコンテキストに基づいて、ユーザー固有のデータ セキュリティとアクセス制御が適用されます。
可観測性
DESCRIBE EXTENDED
、INFORMATION_SCHEMA
、またはカタログ エクスプローラーを使用して、特定のストリーミング テーブルに適用される既存の行フィルターと列マスクを調べます。 この機能により、ユーザーはストリーミング テーブルのデータ アクセスと保護対策を監査および確認できます。
制限事項
テーブル所有者だけがストリーミング テーブルを更新して最新のデータを取得できます。
ALTER TABLE
コマンドはストリーミング テーブルでは許可されません。 テーブルの定義とプロパティは、CREATE OR REFRESH
または ALTER STREAMING TABLE ステートメントを使用して変更する必要があります。タイム トラベル クエリはサポートされていません。
INSERT INTO
やMERGE
などの DML コマンドを利用してテーブル スキーマを導き出すことはできません。ストリーミング テーブルでは、次のコマンドはサポートされていません。
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Delta Sharing はサポートされていません。
テーブルの名前変更や所有者の変更はサポートされていません。
PRIMARY KEY
やFOREIGN KEY
などのテーブル制約はサポートされていません。生成された列、ID 列、既定の列はサポートされていません。
例
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')