チュートリアル: 最初の Delta Live Tables パイプラインを実行する
このチュートリアルでは、最初の Delta Live Tables パイプラインを構成し、基本的な ETL コードを記述し、パイプラインの更新を実行する手順について説明します。
このチュートリアルのすべての手順は、Unity カタログが有効になっているワークスペース用に設計されています。 従来の Hive メタストアと連携するように Delta Live Tables パイプラインを構成することもできます。 「 レガシ Hive メタストアで Delta Live Tables パイプラインを使用するを参照してください。
Note
このチュートリアルでは、Databricks ノートブックを使用して新しいパイプライン コードを開発および検証する手順について説明します。 Python または SQL ファイルのソース コードを使用してパイプラインを構成することもできます。
Delta Live Tables 構文を使用してソース コードが既に記述されている場合は、コードを実行するようにパイプラインを構成できます。 「 デルタ ライブ テーブル パイプラインを構成するを参照してください。
Databricks SQL で完全宣言型 SQL 構文を使用して、具体化されたビューとストリーミング テーブルの更新スケジュールを Unity カタログマネージド オブジェクトとして登録および設定できます。 「 Databricks SQL で具体化されたビューを使用するおよび Databricks SQL のストリーミング テーブルを使用してデータを読み込むを参照してください。
例: ニューヨークの赤ちゃんの名前のデータを取り込んで処理する
この記事の例では、ニューヨーク州の赤ちゃんの名前 のレコードを含む一般公開されているデータセットを使います。 この例では、Delta Live Tables パイプラインを使用して次の操作を行います。
- ボリュームからテーブルに生の CSV データを読み取ります。
- インジェスト テーブルからレコードを読み取り、Delta Live Tables expectations を使用して、クレンジングされたデータを含む新しいテーブルを作成します。
- 派生データセットを作成する Delta Live Tables クエリへの入力として、クレンジング済みのレコードを使う。
このコードは、メダリオン アーキテクチャのシンプルな例を示しています。 「メダリオン レイクハウス アーキテクチャとは」を参照してください。
この例の実装は、Python と SQL に対して提供されます。 手順に従って新しいパイプラインとノートブックを作成し、指定されたコードをコピーして貼り付けます。
完全なコードを含む ノートブック の例も提供されています。
要件
パイプラインを開始するには、クラスター作成のアクセス許可、または Delta Live Tables クラスターを定義するクラスター ポリシーへのアクセス権が必要です。 Delta Live Tables ランタイムでは、パイプラインを実行する前にクラスターを作成します。ユーザーが適切なアクセス許可を持たない場合、これは失敗します。
既定では、すべてのユーザーがサーバーレス パイプラインを使用して更新をトリガーできます。 サーバーレスはアカウント レベルで有効にする必要があり、ワークスペース リージョンでは使用できない場合があります。 「サーバーレス コンピューティングを有効にする」をご覧ください。
このチュートリアルの例では、 Unity Catalog を使用します。 Databricks では、ターゲット スキーマに複数のデータベース オブジェクトが作成されるため、このチュートリアルを実行するための新しいスキーマを作成することをお勧めします。
- カタログに新しいスキーマを作成するには、
ALL PRIVILEGES
またはUSE CATALOG
とCREATE SCHEMA
権限が必要です。 - 新しいスキーマを作成できない場合は、既存のスキーマに対してこのチュートリアルを実行します。 次の特権が必要です。
USE CATALOG
親カタログの場合は 〗。ALL PRIVILEGES
または、ターゲット スキーマに対するUSE SCHEMA
、CREATE MATERIALIZED VIEW
、およびCREATE TABLE
特権。
- このチュートリアルでは、ボリュームを使用してサンプル データを格納します。 Databricks では、このチュートリアル用に新しいボリュームを作成することをお勧めします。 このチュートリアル用に新しいスキーマを作成する場合は、そのスキーマに新しいボリュームを作成できます。
- 既存のスキーマに新しいボリュームを作成するには、次の権限が必要です。
USE CATALOG
親カタログの場合は 〗。ALL PRIVILEGES
またはターゲット スキーマに対するUSE SCHEMA
およびCREATE VOLUME
権限。
- 必要に応じて、既存のボリュームを使用できます。 次の特権が必要です。
USE CATALOG
親カタログの場合は 〗。USE SCHEMA
親スキーマの場合は 〗。ALL PRIVILEGES
またはターゲット ボリュームにREAD VOLUME
してWRITE VOLUME
します。
- 既存のスキーマに新しいボリュームを作成するには、次の権限が必要です。
これらのアクセス許可を設定するには、Databricks 管理者に問い合わせてください。 Unity カタログ権限の詳細については、「 Unity Catalog 権限とセキュリティ保護可能なオブジェクトを参照してください。
- カタログに新しいスキーマを作成するには、
手順 0: データをダウンロードする
この例では、Unity カタログ ボリュームからデータを読み込みます。 次のコードは、CSV ファイルをダウンロードし、指定されたボリュームに格納します。 新しいノートブックを開き、次のコードを実行して、このデータを指定されたボリュームにダウンロードします。
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
<catalog-name>
、<schema-name>
、<volume-name>
を、Unity Catalog ボリュームのカタログ、スキーマ、ボリュームの名前に置き換えます。 指定されたコードは、これらのオブジェクトが存在しない場合に、指定されたスキーマとボリュームの作成を試みます。 Unity カタログ内のオブジェクトを作成および書き込むには、適切な権限が必要です。 「 要件」を参照してください。
Note
チュートリアルを続行する前に、このノートブックが正常に実行されていることを確認します。 パイプラインの一部としてこのノートブックを構成しないでください。
手順 1: パイプラインを作成する
Delta Live Tables では、Delta Live Tables 構文を使用して、ノートブックまたはファイル ( ソース コードと呼ばれます) で定義されている依存関係を解決することで、パイプラインが作成されます。 各ソース コード ファイルに含めることができる言語は 1 つだけですが、パイプラインに複数の言語固有のノートブックまたはファイルを追加できます。
重要
Source code フィールドにアセットを構成しないでください。 このフィールドを黒のままにすると、ソース コードの作成用にノートブックが作成され、構成されます。
このチュートリアルの手順では、サーバーレス コンピューティングと Unity カタログを使用します。 これらの手順で説明されていないすべての構成オプションには、既定の設定を使用します。
Note
ワークスペースでサーバーレスが有効になっていないか、サポートされていない場合は、既定のコンピューティング設定を使用して記述されたチュートリアルを完了できます。 Create パイプライン UI の Destination セクションの Storage オプションで、手動で Unity Catalog を選択する必要があります。
新しいパイプラインを構成するには、次の操作を行います。
- サイドバーの [ Delta ライブ テーブル をクリックします。
- [パイプライン 作成] をクリックします。
- 一意の Pipeline 名を指定します。
- Serverless の横にあるチェック ボックスをオンにします。
- Catalog を選択してデータを発行します。
- カタログ内の Schema を選択します。
- スキーマを作成する新しいスキーマ名を指定します。
- Advanced の下にある Add configuration ボタンを使用して 3 つのパイプライン パラメーターを定義し、3 つの構成を追加します。 次のパラメーター名を使用して、データをダウンロードしたカタログ、スキーマ、およびボリュームを指定します。
my_catalog
my_schema
my_volume
- Create をクリックしてください。
新しく作成されたパイプラインのパイプライン UI が表示されます。 ソース コード ノートブックが自動的に作成され、パイプライン用に構成されます。
ノートブックは、ユーザー ディレクトリ内の新しいディレクトリに作成されます。 新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。 たとえば、/Users/your.username@databricks.com/my_pipeline/my_pipeline
のようにします。
このノートブックにアクセスするためのリンクは、Pipeline の詳細 パネルの Source コード フィールドにあります。 リンクをクリックしてノートブックを開き、次の手順に進みます。
手順 2: Python または SQL を使用してノートブックで具体化されたビューとストリーミング テーブルを宣言する
Datbricks ノートブックを使用して、Delta Live Tables パイプラインのソース コードを対話的に開発および検証できます。 この機能を使用するには、ノートブックをパイプラインにアタッチする必要があります。 新しく作成したノートブックを、先ほど作成したパイプラインにアタッチするには:
- 右上の Connect をクリックして、コンピューティング構成メニューを開きます。
- 手順 1 で作成したパイプラインの名前にカーソルを合わせます。
- [Connect] をクリックします。
UI が変更され、右上に Validate Start ボタンが含まれます。 パイプライン コード開発のノートブック サポートの詳細については、「ノートブックでの Delta Live Tables パイプラインの開発とデバッグを参照してください。
重要
- Delta Live Tables パイプラインは、計画中にノートブック内のすべてのセルを評価します。 汎用コンピューティングに対して実行されるノートブックやジョブとしてスケジュールされたノートブックとは異なり、パイプラインでは、セルが指定された順序で実行されるとは限りません。
- ノートブックに含めることができるプログラミング言語は 1 つだけです。 パイプライン ソース コード ノートブックで Python と SQL コードを混在させないでください。
Python または SQL を使用したコードの開発の詳細については、「Develop pipeline code with Python または SQL を使用したパイプライン コードの開発を参照してください。
パイプライン コードの例
このチュートリアルの例を実装するには、次のコードをコピーして、パイプラインのソース コードとして構成されたノートブック内のセルに貼り付けます。
指定されたコードでは、次の処理が行われます。
- 必要なモジュールをインポートします (Python のみ)。
- パイプラインの構成中に定義されたパラメーターを参照します。
- ボリュームから取り込む
baby_names_raw
という名前のストリーミング テーブルを定義します。 - 取り込まれたデータを検証する
baby_names_prepared
という名前の具体化されたビューを定義します。 - データの高度に洗練されたビューを持つ
top_baby_names_2021
という名前の具体化されたビューを定義します。
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
手順 3: パイプラインの更新を開始する
パイプラインの更新を開始するには、ノートブック UI の右上にある Start ボタンをクリックします。
ノートブックの例
次のノートブックには、この記事で提供されているのと同じコード例が含まれています。 これらのノートブックには、この記事の手順と同じ要件があります。 「 要件」を参照してください。
ノートブックをインポートするには、次の手順を実行します。
- ノートブック UI を開きます。
- [ + 新規>Notebook をクリックします。
- 空のノートブックが開きます。
- [ファイル]>[インポート] をクリックします。 [インポート] ダイアログが表示されます。
- Import from の URL オプションを選択します。
- ノートブックの URL を貼り付けます。
- [インポート] をクリックします。
このチュートリアルでは、Delta Live Tables パイプラインを構成して実行する前に、データ セットアップ ノートブックを実行する必要があります。 次のノートブックをインポートし、ノートブックをコンピューティング リソースにアタッチし、 my_catalog
、 my_schema
、 my_volume
に必要な変数を入力して、 [すべて実行] をクリック。
パイプラインのデータダウンロードチュートリアル
次のノートブックは、Python または SQL の例を示しています。 ノートブックをインポートすると、ユーザーのホーム ディレクトリに保存されます。
以下のいずれかのノートブックをインポートした後、パイプラインを作成する手順を完了しますが、 Source コード ファイル ピッカーを使用してダウンロードしたノートブックを選択します。 ソース コードとして構成されたノートブックを使用してパイプラインを作成した後、パイプライン UI で Start をクリックして更新をトリガーします。