Delta Live Tables を使用してデータを変換する
この記事では、Delta Live Tables を使用してデータセットで変換を宣言し、クエリ ロジックを利用したレコードの処理方法を指定する方法について説明します。 また、Delta Live Tables パイプラインを構築するための一般的な変換パターンの例も含まれています。
DataFrame を返す任意のクエリに対してデータセットを定義できます。 Delta Live Tables パイプラインでは、Apache Spark の組み込み操作、UDF、カスタム ロジック、MLflow モデルを変換として使用できます。 データが Delta Live Tables パイプラインに取り込まれた後、アップストリーム ソースに対して新しいデータセットを定義して、新しいストリーミング テーブル、具体化されたビュー、ビューを作成できます。
Delta Live Tables でステートフル処理を効果的に実行する方法については、「Delta Live Tables でウォーターマークを使ってステートフル処理を最適化する」を参照してください。
ビュー、具体化されたビュー、ストリーミング テーブルを使用する場合
パイプライン クエリを実装する場合は、最適なデータセットの種類を選択して、効率的で保守可能であることを確認します。
ビューを使用して次の操作を行うことを検討してください。
- 必要な大規模または複雑なクエリを、管理しやすいクエリに分割します。
- 期待値を使用して中間結果を検証します。
- 永続化する必要のない結果のストレージコストとコンピューティング コストを削減します。 テーブルは具体化されているため、追加の計算とストレージ リソースが必要です。
次の場合は、具体化されたビューの使用を検討してください。
- 複数のダウンストリーム クエリがテーブルを消費する。 ビューはオンデマンドで計算されるため、ビューをクエリするたびにビューは再計算されます。
- その他のパイプライン、ジョブ、クエリでテーブルを使用する。 ビューは具体化されていないため、同じパイプラインの中でしか使用できません。
- 開発中にクエリの結果を見たい場合。 テーブルは具体化されており、閲覧や照会をパイプラインの外部でできるため、開発中にテーブルを使用することで計算の正しさを検証するのに役立ちます。 検証後、具体化を必要としないクエリをビューに変換します。
次の場合は、ストリーミング テーブルの使用を検討してください。
- クエリが、継続的またはインクリメンタルに増加しているデータ ソースに対して定義されている。
- クエリ結果はインクリメンタルに計算する必要がある。
- パイプラインには、高スループットと低待機時間が必要です。
Note
ストリーミング テーブルは、常にストリーミング ソースに対して定義されます。 APPLY CHANGES INTO
でストリーミング ソースを使用して、CDC フィードから更新を適用することもできます。 APPLY CHANGES API: Delta Live Tables を使用した変更データ キャプチャの簡略化に関する記事を参照してください。
ターゲット スキーマからテーブルを除外する
外部使用を意図していない中間テーブルを計算する必要がある場合は、 TEMPORARY
キーワードを使用して、テーブルがスキーマにパブリッシュされないようにすることができます。 一時テーブルは引き続き Delta Live Tables セマンティクスに従ってデータを格納および処理しますが、現在のパイプラインの外部にはアクセスしないでください。 一時テーブルは、それを作成するパイプラインの有効期間中保持されます。 一時テーブルを宣言するには、次の構文を使用します。
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dlt.table(
temporary=True)
def temp_table():
return ("...")
ストリーミング テーブルと具体化されたビューを 1 つのパイプラインに結合する
ストリーミング テーブルは、Apache Spark 構造化ストリーミングの処理の保証を継承し、追加専用のデータ ソースからのクエリを処理するように構成されています。ここでは、新しい行は変更されるのではなく、常にソース テーブルに挿入されます。
Note
ストリーミング テーブルには既定では追加専用のデータ ソースが必要とされますが、更新や削除が発生するような別のストリーミング テーブルをストリーミング ソースにする場合は、その動作をskipChangeCommits フラグでオーバーライドできます。
一般的なストリーミング パターンには、ソース データを取り込み、パイプラインに初期データセットを作成することが含まれます。 これらの初期データセットは、一般的にブロンズ テーブルと呼ばれ、多くの場合、単純な変換を実行します。
これに対し、パイプラインの最終的なテーブル (一般にゴールド テーブルと呼ばれます) では、複雑な集計や APPLY CHANGES INTO
操作のターゲットからの読み取りが必要になることがよくあります。 これらの操作は本質的に追加ではなく更新を作成するため、ストリーミング テーブルへの入力としてサポートされていません。 これらの変換は、具体化されたビューに適しています。
ストリーミング テーブルと具体化されたビューを 1 つのパイプラインに組み合わせると、パイプラインを簡素化し、コストがかかる生データの再取り込みまたは再処理を避け、SQL の全機能を活用して効率的にエンコードおよびフィルター処理されたデータセットに対して複雑な集計を計算することができます。 次の例は、この種類の混合処理を示しています。
Note
これらの例では、自動ローダーを使用してクラウド ストレージからファイルを読み込みます。 Unity Catalog が有効になったパイプラインで自動ローダーを使用してファイルを読み込むには、外部の場所を使用する必要があります。 Delta Live Tables での Unity Catalog の使用の詳細については、「Delta Live Tables パイプラインでの Unity Catalog の使用」を参照してください。
Python
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("LIVE.streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("LIVE.streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
"abfss://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
Auto Loader を使用して Azure Storage から JSON ファイルを増分的に取り込む方法について説明します。
ストリーム静的結合
ストリーム静的結合は、主に静的なディメンション テーブルを使用して追加専用のデータの連続したストリームを非正規化する場合に最適な選択肢です。
パイプラインが更新されるたびに、ストリームからの新しいレコードが静的テーブルの最新のスナップショットと結合されます。 ストリーミング テーブルからの対応するデータが処理された後に、レコードが静的テーブルで追加または更新された場合は、完全な更新が実行されない限り、結果のレコードは再計算されません。
トリガーされた実行に対して構成されたパイプラインでは、静的テーブルは更新が開始された時点での結果を返します。 継続的実行用に構成されたパイプラインでは、テーブルが更新を処理するたびに、最新バージョンの静的テーブルが照会されます。
ストリーム静的結合の例を以下に示します。
Python
@dlt.table
def customer_sales():
return spark.readStream.table("LIVE.sales").join(spark.readStream.table("LIVE.customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
集計を効率的に計算する
カウント、最小、最大、合計などの単純な分散集計や、平均または標準偏差などの代数集計も、ストリーミング テーブルを使用してインクリメンタルに計算できます。 Databricks では、 GROUP BY country
句を含むクエリなど、グループの数が限られているクエリに対して増分集計を推奨します。 更新ごとに新しい入力データだけが読み取られます。
増分集計を実行する Delta Live Tables クエリの作成の詳細については、ウォーターマークを使用したウィンドウ集計の実行に関する記事を参照してください。
Delta Live Tables パイプラインで MLFlow モデルを使用する
Note
Unity Catalog 対応パイプラインで MLflow モデルを使うには、preview
チャネルを使うようにパイプラインを構成する必要があります。 current
チャネルを使うには、Hive メタストアに公開するようにパイプラインを構成する必要があります。
Delta Live Tables パイプラインでは、MLflow でトレーニング済みのモデルを使用できます。 MLflow モデルは Azure Databricks で変換として扱われます。つまり、Spark DataFrame の入力に基づいて動作し、結果を Spark DataFrame として返します。 Delta Live Tables は DataFrame に対してデータセットを定義するため、MLflow を使用する Apache Spark ワークロードを、わずか数行のコードで Delta Live Tables に変換できます。 MLflow の詳細については、「MLflow を使用した機械学習ライフサイクル管理」を参照してください。
MLflow モデルを呼び出す Python ノートブックが既にある場合は、@dlt.table
デコレーターを使用して、変換結果を返すように関数が定義されていることを確認すると、このコードを Delta Live Tables に適合させることができます。 Delta Live Tables では MLflow が既定でインストールされないため、 %pip install mlflow
を使用して MLFlow ライブラリがインストールされ、ノートブックの上部に mlflow
と dlt
がインポートされていることを確認します。 Delta Live Tables 構文の概要については、Python を使用した Develop パイプライン コードを参照してください。
Delta Live Tables で MLflow モデルを使用するには、次の手順を実行します。
- MLflow モデルの実行 ID とモデル名を取得します。 実行 ID とモデル名は、MLflow モデルの URI の構築に使用されます。
- この URI を使用して、MLflow モデルを読み込む Spark UDF を定義します。
- テーブル定義で UDF を呼び出して、MLflow モデルを使用します。
次の例は、このパターンの基本的な構文を示しています。
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
完全な例として、次のコードでは、ローン リスク データでトレーニングされた MLflow モデルを読み込む loaded_model_udf
という名前の Spark UDF を定義します。 予測を行うために使用するデータ列は、引数として UDF に渡されます。 テーブル loan_risk_predictions
では、loan_risk_input_data
の各行の予測を計算します。
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
手動での削除または更新の保持
Delta Live Tables を使用すると、テーブルからレコードを手動で削除または更新し、更新操作を実行してダウンストリーム テーブルを再計算することができます。
既定では、パイプラインが更新されるたびに Delta Live Tables で入力データに基づいてテーブルの結果が再計算されるため、削除されたレコードがソース データから再度読み込まれないようにする必要があります。 pipelines.reset.allowed
テーブル プロパティを false
に設定すると、テーブルへの更新はできなくなりますが、テーブルへの増分書き込みまたは新しいデータがテーブルに流れるのを防ぐことはありません。
次の図は、2 つのストリーミング テーブルを使用した例を示しています。
raw_user_table
でソースから生のユーザー データを取り込む。bmi_table
は、raw_user_table
の体重と身長を使用して BMI スコアをインクリメンタルに計算する。
raw_user_table
からユーザー レコードを手動で削除または更新し、bmi_table
を再計算する必要があります。
次のコードでは、pipelines.reset.allowed
テーブル プロパティを false
に設定し、raw_user_table
の完全更新を無効にして、意図した変更が経時的に維持されるようにしますが、パイプラインの更新の実行時にダウンストリーム テーブルが再計算されることを示しています。
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);