SQL を使用してパイプライン コードを開発する
Delta Live Tables には、パイプラインで具体化されたビューとストリーミング テーブルを定義するための新しい SQL キーワードと関数がいくつか導入されています。 パイプラインの開発に対する SQL サポートは、Spark SQL の基本に基づいており、構造化ストリーミング機能のサポートが追加されています。
PySpark DataFrames に慣れているユーザーは、Python を使用したパイプライン コードの開発を好む場合があります。 Python では、メタプログラミング操作など、SQL で実装するのが困難な、より広範なテストと操作がサポートされています。 Python を使用した Develop パイプライン コードを参照してください。
Delta Live Tables SQL 構文の完全なリファレンスについては、「 Delta Live Tables SQL 言語リファレンスを参照してください。
パイプライン開発のための SQL の基本
Delta Live Tables データセットを作成する SQL コードでは、 CREATE OR REFRESH
構文を使用して、クエリ結果に対して具体化されたビューとストリーミング テーブルを定義します。
STREAM
キーワードは、SELECT
句で参照されるデータ ソースをストリーミング セマンティクスで読み取る必要があるかどうかを示します。
Delta Live Tables のソース コードは SQL スクリプトとは大きく異なります。Delta Live Tables は、パイプラインで構成されているすべてのソース コード ファイルのすべてのデータセット定義を評価し、クエリを実行する前にデータフロー グラフを構築します。 ノートブックまたはスクリプトに表示されるクエリの順序では、実行順序は定義されません。
SQL を使用して具体化されたビューを作成する
次のコード例は、SQL で具体化されたビューを作成するための基本的な構文を示しています。
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
SQL を使用してストリーミング テーブルを作成する
次のコード例は、SQL を使用してストリーミング テーブルを作成するための基本的な構文を示しています。
Note
すべてのデータ ソースがストリーミング読み取りをサポートしているわけではありません。また、一部のデータ ソースは常にストリーミング セマンティクスで処理する必要があります。
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
オブジェクト ストレージからデータを読み込む
Delta Live Tables は、Azure Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。 「データ形式のオプション」を参照してください。
Note
これらの例では、ワークスペースに自動的にマウントされた /databricks-datasets
で使用可能なデータを使用します。 Databricks では、クラウド オブジェクト ストレージに格納されているデータを参照するために、ボリューム パスまたはクラウド URI を使用することをお勧めします。 「Unity Catalog ボリュームとは」を参照してください。
Databricks では、クラウド オブジェクト ストレージに格納されているデータに対して増分インジェスト ワークロードを構成するときに、自動ローダーとストリーミング テーブルを使用することをお勧めします。 「自動ローダー」を参照してください。
SQL では、 read_files
関数を使用して自動ローダー機能を呼び出します。 また、 STREAM
キーワードを使用して、 read_files
でストリーミング読み取りを構成する必要があります。
次の例では、自動ローダーを使用して JSON ファイルからストリーミング テーブルを作成します。
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
read_files
関数では、マテリアライズド ビューを作成するためのバッチ セマンティクスもサポートされています。 次の例では、バッチ セマンティクスを使用して JSON ディレクトリを読み取り、具体化されたビューを作成します。
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
期待値を使用してデータを検証する
期待値を使用して、データ品質の制約を設定および適用できます。 「Delta Live Tables を使用してデータ品質を管理する」を参照してください。
次のコードでは、データ インジェスト中に null レコードを削除する valid_data
という名前の期待値を定義します。
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
パイプラインで定義された具体化されたビューとストリーミング テーブルに対してクエリを実行する
LIVE
スキーマを使用して、パイプラインで定義されている他の具体化されたビューとストリーミング テーブルに対してクエリを実行します。
次の例では、4 つのデータセットを定義します。
- JSON データを読み込む
orders
という名前のストリーミング テーブル。 - CSV データを読み込む
customers
という名前の具体化されたビュー。 orders
およびcustomers
データセットのレコードを結合し、注文タイムスタンプを日付にキャストし、customer_id
、order_number
、state
、およびorder_date
フィールドを選択する、customer_orders
という名前の具体化されたビュー。- 各状態の注文の日次数を集計する
daily_orders_by_state
という名前の具体化されたビュー。
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;