Delta Live Tables の SQL 言語リファレンス
この記事では、Delta Live Tables の SQL プログラミング インターフェイスについて詳しく説明します。
- Python API の詳細については、「Delta Live Tables の Python 言語リファレンス」を参照してください。
- SQL コマンドの詳細については、「SQL 言語リファレンス」を参照してください。
SQL クエリでは Python ユーザー定義関数 (UDF) を使用できますが、これらの UDF は、SQL ソース ファイルで呼び出す前に Python ファイルで定義する必要があります。 「ユーザー定義スカラー関数 - Python」を参照してください。
制限事項
PIVOT
句はサポートされていません。 Spark での pivot
操作では、出力スキーマを計算するために、入力データを積極的に読み込む必要があります。 この機能は、Delta Live Tables ではサポートされていません。
Delta Live Tables の具体化されたビューまたはストリーミング テーブルを作成する
Note
- 具体化されたビューを作成するための
CREATE OR REFRESH LIVE TABLE
構文は非推奨です。 代わりにCREATE OR REFRESH MATERIALIZED VIEW
を使用してください。 CLUSTER BY
句を使用してリキッド クラスタリングを有効にするには、プレビュー チャンネル を使用するようにパイプラインを構成する必要があります。
ストリーミング テーブルまたは具体化されたビューを宣言するときは、同じ基本的な SQL 構文を使用します。
SQL を使用して Delta Live Tables 具体化されたビューを宣言する
SQL を使用して Delta Live Tables で具体化されたビューを宣言するための構文を次に示します。
CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
SQL を使用して Delta Live Tables ストリーミング テーブルを宣言する
ストリーミング テーブルは、ストリーミング ソースに対して読み取るクエリを使用してのみ宣言できます。 Databricks では、クラウド オブジェクト ストレージからのファイルのストリーミング インジェストに自動ローダーの使用が推奨されています。 「自動ローダー SQL 構文」を参照してください。
パイプライン内の他のテーブルまたはビューをストリーミング ソースとして指定する場合は、データセット名の周囲に STREAM()
関数を含める必要があります。
SQL を使用して Delta Live Tables でストリーミング テーブルを宣言するための構文を次に示します。
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Delta Live Tables ビューを作成する
SQL を使用してビューを宣言するための構文を次に示します。
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
自動ローダー SQL 構文
SQL で自動ローダーを操作するための構文を次に示します。
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
自動ローダーではサポートされている形式オプションを使用できます。 map()
関数を使用すると、read_files()
メソッドにオプションを渡すことができます。 オプションはキーと値のペアで、キーと値は文字列です。 サポート形式とオプションの詳細については、「ファイル形式のオプション」を参照してください。
例: テーブルを定義する
データセットを作成するには、外部データ ソースを読み取るか、パイプラインで定義されているデータセットを読み取ります。 内部データセットから読み取るには、データセット名の前にキーワード LIVE
を追加します。 次の例では、2 つの異なるデータセットを定義します。JSON ファイルを入力ソースとして受け取る taxi_raw
というテーブルと、taxi_raw
テーブルを入力として受け取る filtered_data
というテーブルです。
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
例: ストリーミング ソースから読み取る
自動ローダーや内部データセットなどのストリーミング ソースからデータを読み取るために、 STREAMING
テーブルを定義します。
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
ストリーミング データの詳細については、「Delta Live Tables を使用してデータを変換する」を参照してください。
テーブルが具体化される方法を制御する
テーブルには、その具体化の追加のコントロールも用意されています。
PARTITIONED BY
を使用してテーブルをパーティション分割する方法を指定します。 パーティション分割を使用すると、クエリを高速化できます。- テーブルのプロパティは、
TBLPROPERTIES
を使用して設定できます。 「Delta Live Tables テーブルのプロパティ」を参照してください。 LOCATION
設定を使用して、保存場所を設定します。 既定では、LOCATION
が設定されていない場合、テーブル データはパイプラインの保存場所に格納されます。- 生成された列をスキーマ定義で使用できます。 「例: スキーマ列とパーティション列を指定する」を参照してください。
Note
サイズが 1 TB 未満のテーブルの場合、Databricks では、Delta Live Tables でデータ編成を制御できるようにすることをお勧めします。 テーブルのサイズが 1 テラバイトを超えて拡大することが予想される場合を除き、Databricks ではパーティション列を指定しないことを推奨しています。
例: スキーマ列とパーティション列を指定する
必要に応じて、テーブルを定義するときにスキーマを指定できます。 次の例では、Delta Lake で生成された列の使用やテーブルのパーティション列の定義など、ターゲット テーブルのスキーマを指定します。
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
既定では、スキーマが指定されなかった場合、Delta Live Tables は、table
定義からスキーマを推論します。
例: テーブル制約を定義する
Note
テーブル制約に関する Delta Live Tables のサポートは、パブリック プレビュー段階にあります。 テーブル制約を定義するには、パイプラインが Unity Catalog 対応のパイプラインであり、preview
チャネルを使用するように構成されている必要があります。
スキーマを指定する際には、主キーと外部キーを定義できます。 制約は情報提供のみであり、強制されるものではありません。 SQL 言語リファレンスの CONSTRAINT 句を参照してください。
次の例では、主キーと外部キーの制約を使用してテーブルを定義します。
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
SQL でテーブルまたはビューを宣言するときに使用される値をパラメーター化する
Spark 構成を含むテーブルまたはビューを宣言するクエリで構成値を指定するためには、SET
を使用します。 SET
ステートメントが定義された値にアクセスできるようになったら、ノートブックで任意のテーブルまたはビューを定義します。 SET
ステートメントを使用して指定されたすべての Spark 構成は、SET ステートメントに続く任意のテーブルまたはビューに対して Spark クエリを実行するときに使用されます。 クエリで構成値を読み取るには、文字列補間構文 ${}
を使用します。 次の例では、startDate
という名前の Spark 構成値を設定し、その値をクエリで使用します。
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
複数の構成値を指定するには、値ごとに個別の SET
ステートメントを使用します。
例: 行フィルターと列マスクの定義
重要
行フィルターと列マスクは、 パブリック プレビュー段階にあります。
行フィルターと列マスクを使用して具体化されたビューまたはストリーミング テーブルを作成するには、 ROW FILTER 句 および MASK 句を使用します。 次の例では、マテリアライズド ビューと、行フィルターと列マスクの両方を使用してストリーミング テーブルを定義する方法を示します。
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze
行フィルターと列マスクの詳細については、「行フィルターと列マスクを使用してテーブルを公開する」を参照してください。
SQL のプロパティ
Note
CLUSTER BY
句を使用してリキッド クラスタリングを有効にするには、プレビュー チャンネル を使用するようにパイプラインを構成する必要があります。
CREATE TABLE または VIEW |
---|
TEMPORARY テーブルは作成しますが、テーブルのメタデータは公開しません。 TEMPORARY 句では、パイプラインで使用できるものの、パイプラインの外部ではアクセスできないテーブルを作成するように Delta Live Tables に指示します。 処理時間を短縮するために、一時テーブルは、単一の更新だけでなく、それを作成するパイプラインの有効期間中保持されます。 |
STREAMING 入力データセットをストリームとして読み取るテーブルを作成します。 入力データセットは、自動ローダーや STREAMING テーブルなどのストリーミング データ ソースである必要があります。 |
CLUSTER BY テーブルでリキッド クラスタリングを有効にし、クラスタリング キーとして使用する列を定義してください。 「Delta テーブルに Liquid Clustering クラスタリングを使用する」を参照してください。 |
PARTITIONED BY テーブルのパーティション分割に使用する 1 つ以上の列の省略可能なリスト。 |
LOCATION テーブル データの省略可能な保存場所。 設定されていない場合、システムは既定でパイプラインの保存場所に設定します。 |
COMMENT テーブルの省略可能な説明。 |
column_constraint 列に対するオプションの情報主キーまたは外部キー制約。 |
MASK clause (パブリック プレビュー)列マスク関数を追加して、機密データを匿名化します。 その列の今後のクエリでは、列の元の値でなく、評価された関数の結果が返されます。 これは、関数がユーザーの ID とグループ メンバーシップをチェックして値を編集するかどうかを決定できるため、きめ細かいアクセス制御に役立ちます。 「列マスク句」を参照してください |
table_constraint テーブルに対するオプションの情報主キーまたは外部キー制約。 |
TBLPROPERTIES テーブルのテーブル プロパティの省略可能なリスト。 |
WITH ROW FILTER clause (パブリック プレビュー)行フィルター関数をテーブルに追加します。 今後そのテーブルのクエリでは、関数が TRUE に評価される行のサブセットを受け取ります。 これは、関数が呼び出し元ユーザーの ID およびグループ メンバーシップを検査して、特定の行をフィルター処理するかどうかを決定できるため、きめ細かいアクセス制御に役立ちます。 「ROW FILTER 句」を参照してください。 |
select_statement テーブルのデータセットを定義する Delta Live Tables クエリ。 |
CONSTRAINT 句 |
---|
EXPECT expectation_name データ品質制約 expectation_name を定義します。 この ON VIOLATION 制約が定義されていない場合は、制約に違反する行をターゲット データセットに追加します。 |
ON VIOLATION 失敗した行に対して実行する省略可能なアクション: - FAIL UPDATE : パイプラインの実行を直ちに停止します。- DROP ROW : レコードをドロップし、処理を続行します。 |
Delta Live Tables の SQL を使用した変更データ キャプチャ
APPLY CHANGES INTO
ステートメントを使用して、次の説明で説明されているように、Delta Live Tables CDC 機能を使用します。
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
非 APPLY CHANGES
クエリと同じ CONSTRAINT
句を使用して、APPLY CHANGES
ターゲットのデータ品質制約を定義します。 「Delta Live Tables を使用してデータ品質を管理する」を参照してください。
Note
INSERT
イベントと UPDATE
イベントの既定の動作では、ソースから CDC イベントを upsert します。指定したキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。 DELETE
イベントの処理は、APPLY AS DELETE WHEN
条件で指定できます。
重要
変更を適用する対象のターゲット ストリーミング テーブルを宣言する必要があります。 必要に応じて、ターゲット テーブルのスキーマを指定できます。 APPLY CHANGES
ターゲット テーブルのスキーマを指定する場合は、sequence_by
フィールドと同じデータ型で __START_AT
および __END_AT
列も含める必要があります。
APPLY CHANGES API: Delta Live Tables を使用した変更データ キャプチャの簡略化に関する記事を参照してください。
句 |
---|
KEYS ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。 列の組み合わせを定義するには、列のコンマ区切りのリストを使用します。 この句は必須です。 |
IGNORE NULL UPDATES ターゲット列のサブセットを含む更新プログラムの取り込みを許可します。 CDC イベントが既存の行と一致し、IGNORE NULL UPDATES が指定されている場合、 null を持つ列はターゲット内の既存の値を保持します。 これは、null の値を持つ入れ子になった列にも適用されます。この句は省略可能です。 既定では、既存の列を null 値で上書きします。 |
APPLY AS DELETE WHEN CDC イベントをアップサートでなく DELETE として扱う必要がある場合に指定します。 順序の誤ったデータを処理するために、削除された行は基になる Delta テーブルの廃棄標識として一時的に保持され、これらの廃棄標識をフィルターで除外するビューがメタストアに作成されます。 データ保持間隔は次を使用して構成できます。pipelines.cdc.tombstoneGCThresholdInSeconds テーブル プロパティ。この句は省略可能です。 |
APPLY AS TRUNCATE WHEN CDC イベントを完全なテーブル TRUNCATE として扱う必要がある場合に指定します。 この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースでのみ使用する必要があります。APPLY AS TRUNCATE WHEN の句は SCD タイプ 1 のみサポートしています。 SCD タイプ 2 では、切り捨て操作はサポートされていません。この句は省略可能です。 |
SEQUENCE BY ソース データ内の CDC イベントの論理順序を指定する列名。 Delta Live Tables では、このシーケンス処理を使用して、順不同で到着する変更イベントを処理します。 指定された列は、並べ替え可能なデータ型である必要があります。 この句は必須です。 |
COLUMNS ターゲット テーブルに含める列のサブセットを指定します。 次のいずれかを実行できます。 - 含める列の完全な一覧を指定します: COLUMNS (userId, name, city) 。- 除外する列の一覧を指定します: COLUMNS * EXCEPT (operation, sequenceNum) この句は省略可能です。 既定では、 COLUMNS 句が指定されていない場合、ターゲット テーブルのすべての列が含まれます。 |
STORED AS レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。 この句は省略可能です。 既定値は SCD タイプ 1 です。 |
TRACK HISTORY ON 指定された列に変更があった場合に履歴レコードを生成する出力列のサブセットが指定されます。 次のいずれかを実行できます。 - トラッキングする列の完全なリストを指定します: COLUMNS (userId, name, city) 。- トラッキングから除外する列のリストを指定します: COLUMNS * EXCEPT (operation, sequenceNum) この句は省略可能です。 既定で、 TRACK HISTORY ON * と同等の変更がある場合、すべての出力列の履歴が追跡されます。 |