次の方法で共有


チュートリアル: 最初の Delta Live Tables パイプラインを実行する

このチュートリアルでは、最初の Delta Live Tables パイプラインを構成し、基本的な ETL コードを記述し、パイプラインの更新を実行する手順について説明します。

このチュートリアルのすべての手順は、Unity カタログが有効になっているワークスペース用に設計されています。 従来の Hive メタストアと連携するように Delta Live Tables パイプラインを構成することもできます。 「 レガシ Hive メタストアで Delta Live Tables パイプラインを使用するを参照してください。

Note

このチュートリアルでは、Databricks ノートブックを使用して新しいパイプライン コードを開発および検証する手順について説明します。 Python または SQL ファイルのソース コードを使用してパイプラインを構成することもできます。

Delta Live Tables 構文を使用してソース コードが既に記述されている場合は、コードを実行するようにパイプラインを構成できます。 「 デルタ ライブ テーブル パイプラインを構成するを参照してください。

Databricks SQL で完全宣言型 SQL 構文を使用して、具体化されたビューとストリーミング テーブルの更新スケジュールを Unity カタログマネージド オブジェクトとして登録および設定できます。 「 Databricks SQL で具体化されたビューを使用するおよび Databricks SQL のストリーミング テーブルを使用してデータを読み込むを参照してください。

例: ニューヨークの赤ちゃんの名前のデータを取り込んで処理する

この記事の例では、ニューヨーク州の赤ちゃんの名前 のレコードを含む一般公開されているデータセットを使います。 この例では、Delta Live Tables パイプラインを使用して次の操作を行います。

  • ボリュームからテーブルに生の CSV データを読み取ります。
  • インジェスト テーブルからレコードを読み取り、Delta Live Tables expectations を使用して、クレンジングされたデータを含む新しいテーブルを作成します。
  • 派生データセットを作成する Delta Live Tables クエリへの入力として、クレンジング済みのレコードを使う。

このコードは、メダリオン アーキテクチャのシンプルな例を示しています。 「メダリオン レイクハウス アーキテクチャとは」を参照してください。

この例の実装は、Python と SQL に対して提供されます。 手順に従って新しいパイプラインとノートブックを作成し、指定されたコードをコピーして貼り付けます。

完全なコードを含む ノートブック の例も提供されています。

要件

  • パイプラインを開始するには、クラスター作成のアクセス許可、または Delta Live Tables クラスターを定義するクラスター ポリシーへのアクセス権が必要です。 Delta Live Tables ランタイムでは、パイプラインを実行する前にクラスターを作成します。ユーザーが適切なアクセス許可を持たない場合、これは失敗します。

  • 既定では、すべてのユーザーがサーバーレス パイプラインを使用して更新をトリガーできます。 サーバーレスはアカウント レベルで有効にする必要があり、ワークスペース リージョンでは使用できない場合があります。 「サーバーレス コンピューティングを有効にする」をご覧ください。

  • このチュートリアルの例では、 Unity Catalog を使用します。 Databricks では、ターゲット スキーマに複数のデータベース オブジェクトが作成されるため、このチュートリアルを実行するための新しいスキーマを作成することをお勧めします。

    • カタログに新しいスキーマを作成するには、 ALL PRIVILEGES または USE CATALOGCREATE SCHEMA 権限が必要です。
    • 新しいスキーマを作成できない場合は、既存のスキーマに対してこのチュートリアルを実行します。 次の特権が必要です。
      • USE CATALOG 親カタログの場合は 〗。
      • ALL PRIVILEGES または、ターゲット スキーマに対する USE SCHEMACREATE MATERIALIZED VIEW、および CREATE TABLE 特権。
    • このチュートリアルでは、ボリュームを使用してサンプル データを格納します。 Databricks では、このチュートリアル用に新しいボリュームを作成することをお勧めします。 このチュートリアル用に新しいスキーマを作成する場合は、そのスキーマに新しいボリュームを作成できます。
      • 既存のスキーマに新しいボリュームを作成するには、次の権限が必要です。
        • USE CATALOG 親カタログの場合は 〗。
        • ALL PRIVILEGES またはターゲット スキーマに対する USE SCHEMA および CREATE VOLUME 権限。
      • 必要に応じて、既存のボリュームを使用できます。 次の特権が必要です。
        • USE CATALOG 親カタログの場合は 〗。
        • USE SCHEMA 親スキーマの場合は 〗。
        • ALL PRIVILEGES またはターゲット ボリュームに READ VOLUME して WRITE VOLUME します。

    これらのアクセス許可を設定するには、Databricks 管理者に問い合わせてください。 Unity カタログ権限の詳細については、「 Unity Catalog 権限とセキュリティ保護可能なオブジェクトを参照してください。

手順 0: データをダウンロードする

この例では、Unity カタログ ボリュームからデータを読み込みます。 次のコードは、CSV ファイルをダウンロードし、指定されたボリュームに格納します。 新しいノートブックを開き、次のコードを実行して、このデータを指定されたボリュームにダウンロードします。

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

<catalog-name><schema-name><volume-name> を、Unity Catalog ボリュームのカタログ、スキーマ、ボリュームの名前に置き換えます。 指定されたコードは、これらのオブジェクトが存在しない場合に、指定されたスキーマとボリュームの作成を試みます。 Unity カタログ内のオブジェクトを作成および書き込むには、適切な権限が必要です。 「 要件」を参照してください。

Note

チュートリアルを続行する前に、このノートブックが正常に実行されていることを確認します。 パイプラインの一部としてこのノートブックを構成しないでください。

手順 1: パイプラインを作成する

Delta Live Tables では、Delta Live Tables 構文を使用して、ノートブックまたはファイル ( ソース コードと呼ばれます) で定義されている依存関係を解決することで、パイプラインが作成されます。 各ソース コード ファイルに含めることができる言語は 1 つだけですが、パイプラインに複数の言語固有のノートブックまたはファイルを追加できます。

重要

Source code フィールドにアセットを構成しないでください。 このフィールドを黒のままにすると、ソース コードの作成用にノートブックが作成され、構成されます。

このチュートリアルの手順では、サーバーレス コンピューティングと Unity カタログを使用します。 これらの手順で説明されていないすべての構成オプションには、既定の設定を使用します。

Note

ワークスペースでサーバーレスが有効になっていないか、サポートされていない場合は、既定のコンピューティング設定を使用して記述されたチュートリアルを完了できます。 Create パイプライン UI の Destination セクションの Storage オプションで、手動で Unity Catalog を選択する必要があります。

新しいパイプラインを構成するには、次の操作を行います。

  1. サイドバーの [ Delta ライブ テーブル をクリックします。
  2. [パイプライン 作成] をクリックします。
  3. 一意の Pipeline 名を指定します。
  4. Serverless の横にあるチェック ボックスをオンにします。
  5. Catalog を選択してデータを発行します。
  6. カタログ内の Schema を選択します。
    • スキーマを作成する新しいスキーマ名を指定します。
  7. Advanced の下にある Add configuration ボタンを使用して 3 つのパイプライン パラメーターを定義し、3 つの構成を追加します。 次のパラメーター名を使用して、データをダウンロードしたカタログ、スキーマ、およびボリュームを指定します。
    • my_catalog
    • my_schema
    • my_volume
  8. Create をクリックしてください。

新しく作成されたパイプラインのパイプライン UI が表示されます。 ソース コード ノートブックが自動的に作成され、パイプライン用に構成されます。

ノートブックは、ユーザー ディレクトリ内の新しいディレクトリに作成されます。 新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。 たとえば、/Users/your.username@databricks.com/my_pipeline/my_pipeline のようにします。

このノートブックにアクセスするためのリンクは、Pipeline の詳細 パネルの Source コード フィールドにあります。 リンクをクリックしてノートブックを開き、次の手順に進みます。

手順 2: Python または SQL を使用してノートブックで具体化されたビューとストリーミング テーブルを宣言する

Datbricks ノートブックを使用して、Delta Live Tables パイプラインのソース コードを対話的に開発および検証できます。 この機能を使用するには、ノートブックをパイプラインにアタッチする必要があります。 新しく作成したノートブックを、先ほど作成したパイプラインにアタッチするには:

  1. 右上の Connect をクリックして、コンピューティング構成メニューを開きます。
  2. 手順 1 で作成したパイプラインの名前にカーソルを合わせます。
  3. [Connect] をクリックします。

UI が変更され、右上に Validate Start ボタンが含まれます。 パイプライン コード開発のノートブック サポートの詳細については、「ノートブックでの Delta Live Tables パイプラインの開発とデバッグを参照してください。

重要

  • Delta Live Tables パイプラインは、計画中にノートブック内のすべてのセルを評価します。 汎用コンピューティングに対して実行されるノートブックやジョブとしてスケジュールされたノートブックとは異なり、パイプラインでは、セルが指定された順序で実行されるとは限りません。
  • ノートブックに含めることができるプログラミング言語は 1 つだけです。 パイプライン ソース コード ノートブックで Python と SQL コードを混在させないでください。

Python または SQL を使用したコードの開発の詳細については、「Develop pipeline code with Python または SQL を使用したパイプライン コードの開発を参照してください。

パイプライン コードの例

このチュートリアルの例を実装するには、次のコードをコピーして、パイプラインのソース コードとして構成されたノートブック内のセルに貼り付けます。

指定されたコードでは、次の処理が行われます。

  • 必要なモジュールをインポートします (Python のみ)。
  • パイプラインの構成中に定義されたパラメーターを参照します。
  • ボリュームから取り込む baby_names_raw という名前のストリーミング テーブルを定義します。
  • 取り込まれたデータを検証する baby_names_prepared という名前の具体化されたビューを定義します。
  • データの高度に洗練されたビューを持つ top_baby_names_2021 という名前の具体化されたビューを定義します。

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM LIVE.baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

手順 3: パイプラインの更新を開始する

パイプラインの更新を開始するには、ノートブック UI の右上にある Start ボタンをクリックします。

ノートブックの例

次のノートブックには、この記事で提供されているのと同じコード例が含まれています。 これらのノートブックには、この記事の手順と同じ要件があります。 「 要件」を参照してください。

ノートブックをインポートするには、次の手順を実行します。

  1. ノートブック UI を開きます。
    • [ + 新規>Notebook をクリックします。
    • 空のノートブックが開きます。
  2. [ファイル]>[インポート] をクリックします。 [インポート] ダイアログが表示されます。
  3. Import fromURL オプションを選択します。
  4. ノートブックの URL を貼り付けます。
  5. [インポート] をクリックします。

このチュートリアルでは、Delta Live Tables パイプラインを構成して実行する前に、データ セットアップ ノートブックを実行する必要があります。 次のノートブックをインポートし、ノートブックをコンピューティング リソースにアタッチし、 my_catalogmy_schemamy_volumeに必要な変数を入力して、 [すべて実行] をクリック

パイプラインのデータダウンロードチュートリアル

ノートブックを入手

次のノートブックは、Python または SQL の例を示しています。 ノートブックをインポートすると、ユーザーのホーム ディレクトリに保存されます。

以下のいずれかのノートブックをインポートした後、パイプラインを作成する手順を完了しますが、 Source コード ファイル ピッカーを使用してダウンロードしたノートブックを選択します。 ソース コードとして構成されたノートブックを使用してパイプラインを作成した後、パイプライン UI で Start をクリックして更新をトリガーします。

Delta Live Tables Python ノートブックで作業を開始する

ノートブックを入手

Delta Live Tables SQL ノートブックで作業を開始する

ノートブックを入手