Azure Databricks で最初の ETL ワークロードを実行する
Azure Databricks の運用環境向けのツールを使用して、データ オーケストレーション用の最初の抽出、変換、読み込み (ETL) パイプラインを開発してデプロイする方法について説明します。
この記事を読み終えるころには、次のような作業が快適に感じることでしょう。
- Databricks の汎用コンピューティング クラスターを起動する。
- Databricks ノートブックを作成する。
- 自動ローダーを使用して Delta Lake への増分データ インジェストを構成する。
- ノートブック セルを実行してデータを処理、照会、プレビューする。
- ノートブックを Databricks ジョブとしてスケジュールする。
このチュートリアルでは、対話型ノートブックを使用して Python または Scala から一般的な ETL タスクを実行します。
Delta Live Tables を使用して ETL パイプラインを構築することもできます。 Databricks は、運用 ETL パイプラインの構築、デプロイ、保守の複雑さを軽減するために Delta Live Tables を作成しました。 「チュートリアル: 最初の Delta Live Tables パイプラインを実行する」を参照してください。
Databricks Terraform プロバイダーを使用して、この記事のリソースを作成することもできます。 「Terraform を使用してクラスター、ノートブック、ジョブを作成する」を参照してください。
必要条件
- Azure Databricks ワークスペースにログインしています。
- クラスターを作成するためのアクセス許可があります。
Note
以降の手順の大半は、クラスターの制御権限がなくても、クラスターへのアクセス権があれば実行できます。
手順 1: クラスターを作成する
探索的データ分析やデータ エンジニアリングを行うには、コマンドの実行に必要なコンピューティング リソースを提供するクラスターを作成します。
- サイド バーで [コンピューティング] をクリックします。
- [コンピューティング] ページの [クラスターの作成] をクリックします。 [新しいクラスター] ページが表示されます。
- クラスターの一意の名前を指定し、残りの値を既定の状態のままにして、[クラスターの作成] をクリックします。
Databricks クラスターの詳細については、「コンピューティング」を参照してください。
手順 2: Databricks ノートブックを作成する
ワークスペースにノートブックを作成するには、サイド バーの [新規] をクリックしてから、[ノートブック] をクリックします。 ワークスペースに空のノートブックが開きます。
ノートブックの作成と管理について詳しくは、「Notebooks を管理する」を参照してください。
手順 3: Delta Lake にデータを取り込むように自動ローダーを構成する
Databricks では、自動ローダーを使用した増分データ インジェストが推奨されています。 自動ローダーは、クラウド オブジェクト ストレージに到着した新しいファイルを自動的に検出して処理します。
Databricks はデータの格納に Delta Lake を使用することを推奨しています。 Delta Lake はオープンソースのストレージ レイヤーです。ACID トランザクションを備え、データ レイクハウスを実現します。 Databricks に作成されるテーブルの既定の形式は Delta Lake です。
Delta Lake テーブルにデータを取り込むように自動ローダーを構成するには、次のコードをコピーして、ノートブックの空のセルに貼り付けます。
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
注意
このコードで定義されている変数を使用すれば、既存のワークスペース資産や他のユーザーと競合することなく安全にコードを実行できます。 ネットワークまたはストレージのアクセス許可が制限されている場合は、このコードを実行するとエラーが発生します。ワークスペース管理者に連絡して、それらの制限のトラブルシューティングを行ってください。
自動ローダーの詳細については、「自動ローダー」を参照してください。
手順 4: データを処理して対話的に操作する
ノートブックでは、ロジックがセル単位で実行されます。 セル内のロジックを実行するには:
前の手順で完成したセルを実行するために、そのセルを選択し、Shift + Enter キーを押します。
作成されたテーブルに対してクエリを実行するために、以下のコードをコピーして空のセルに貼り付け、Shift + Enter キーを押してセルを実行します。
Python
df = spark.read.table(table_name)
Scala
val df = spark.read.table(table_name)
DataFrame 内のデータをプレビューするために、以下のコードをコピーして空のセルに貼り付け、Shift + Enter キーを押してセルを実行します。
Python
display(df)
Scala
display(df)
データを対話的に視覚化する方法について詳しくは、「Databricks ノートブックでの視覚化」を参照してください。
手順 5: ジョブをスケジュールする
Databricks ノートブックを運用スクリプトとして実行するには、それらをタスクとして Databricks ジョブに追加します。 この手順では、手動でトリガーできる新しいジョブを作成します。
ノートブックをタスクとしてスケジュールするには:
- ヘッダー バーの右側にある [スケジュール] をクリックします。
- [ジョブ名] に一意の名前を入力します。
- [手動] をクリックします。
- [クラスター] ボックスの一覧から、手順 1. で作成したクラスターを選択します。
- [作成] をクリックします。
- 表示されたウィンドウで、[今すぐ実行] をクリックします。
- ジョブの実行結果を確認するには、前回実行タイムスタンプの横にある アイコンをクリックします。
ジョブの詳細については、「Databricks ジョブとは」を参照してください。
その他の統合
Azure Databricks を使用したデータ エンジニアリングのための統合とツールについては、以下の記事を参照してください。