次の方法で共有


Terraform を使用してクラスター、ノートブック、ジョブを作成する

この記事では、Databricks Terraform プロバイダーを使用して、既存の Azure Databricks ワークスペースクラスターノートブックジョブを作成する方法について説明します。

この記事は、次の Azure Databricks の概要に関する記事に関連しています。

この記事の Terraform 構成を調整して、ワークスペースにカスタムのクラスター、ノートブック、ジョブを作成することもできます。

手順 1: Terraform プロジェクトを作成して構成する

  1. Databricks Terraform プロバイダーの概要に関する記事の「要件」セクションの手順に従って、Terraform プロジェクトを作成します。

  2. クラスターを作成するには、cluster.tf という名前のファイルを作成し、次の内容をファイルに追加します。 この内容により、許可されるリソースの量が最も少ないクラスターが作成されます。 このクラスターでは、最新の Databricks Runtime 長期サポート (LTS) バージョンが使用されます。

    Unity カタログで動作するクラスターの場合:

    variable "cluster_name" {}
    variable "cluster_autotermination_minutes" {}
    variable "cluster_num_workers" {}
    variable "cluster_data_security_mode" {}
    
    # Create the cluster with the "smallest" amount
    # of resources allowed.
    data "databricks_node_type" "smallest" {
      local_disk = true
    }
    
    # Use the latest Databricks Runtime
    # Long Term Support (LTS) version.
    data "databricks_spark_version" "latest_lts" {
      long_term_support = true
    }
    
    resource "databricks_cluster" "this" {
      cluster_name            = var.cluster_name
      node_type_id            = data.databricks_node_type.smallest.id
      spark_version           = data.databricks_spark_version.latest_lts.id
      autotermination_minutes = var.cluster_autotermination_minutes
      num_workers             = var.cluster_num_workers
      data_security_mode      = var.cluster_data_security_mode
    }
    
    output "cluster_url" {
     value = databricks_cluster.this.url
    }
    

    汎用クラスターの場合:

    variable "cluster_name" {
      description = "A name for the cluster."
      type        = string
      default     = "My Cluster"
    }
    
    variable "cluster_autotermination_minutes" {
      description = "How many minutes before automatically terminating due to inactivity."
      type        = number
      default     = 60
    }
    
    variable "cluster_num_workers" {
      description = "The number of workers."
      type        = number
      default     = 1
    }
    
    # Create the cluster with the "smallest" amount
    # of resources allowed.
    data "databricks_node_type" "smallest" {
      local_disk = true
    }
    
    # Use the latest Databricks Runtime
    # Long Term Support (LTS) version.
    data "databricks_spark_version" "latest_lts" {
      long_term_support = true
    }
    
    resource "databricks_cluster" "this" {
      cluster_name            = var.cluster_name
      node_type_id            = data.databricks_node_type.smallest.id
      spark_version           = data.databricks_spark_version.latest_lts.id
      autotermination_minutes = var.cluster_autotermination_minutes
      num_workers             = var.cluster_num_workers
    }
    
    output "cluster_url" {
     value = databricks_cluster.this.url
    }
    
  3. クラスターを作成するには、cluster.auto.tfvars という名前の別のファイルを作成し、次の内容をファイルに追加します。 このファイルには、クラスターをカスタマイズするための変数値が含まれます。 プレースホルダーの値は、実際の値に置き換えます。

    Unity カタログで動作するクラスターの場合:

    cluster_name                    = "My Cluster"
    cluster_autotermination_minutes = 60
    cluster_num_workers             = 1
    cluster_data_security_mode      = "SINGLE_USER"
    

    汎用クラスターの場合:

    cluster_name                    = "My Cluster"
    cluster_autotermination_minutes = 60
    cluster_num_workers             = 1
    
  4. ノートブックを作成するには、notebook.tf という名前の別のファイルを作成し、次の内容をファイルに追加します。

    variable "notebook_subdirectory" {
      description = "A name for the subdirectory to store the notebook."
      type        = string
      default     = "Terraform"
    }
    
    variable "notebook_filename" {
      description = "The notebook's filename."
      type        = string
    }
    
    variable "notebook_language" {
      description = "The language of the notebook."
      type        = string
    }
    
    resource "databricks_notebook" "this" {
      path     = "${data.databricks_current_user.me.home}/${var.notebook_subdirectory}/${var.notebook_filename}"
      language = var.notebook_language
      source   = "./${var.notebook_filename}"
    }
    
    output "notebook_url" {
     value = databricks_notebook.this.url
    }
    
  5. クラスターを作成する場合は、次のノートブック コードを、notebook.tf ファイルと同じディレクトリ内のファイルに保存します。

    チュートリアルのPython ノートブックについて: 次の内容の notebook-getting-started-lakehouse-e2e.py という名前のファイルである、エンド ツー エンド レイクハウス分析パイプライン、を実行します:

    # Databricks notebook source
    external_location = "<your_external_location>"
    catalog = "<your_catalog>"
    
    dbutils.fs.put(f"{external_location}/foobar.txt", "Hello world!", True)
    display(dbutils.fs.head(f"{external_location}/foobar.txt"))
    dbutils.fs.rm(f"{external_location}/foobar.txt")
    
    display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
    
    # COMMAND ----------
    
    from pyspark.sql.functions import col
    
    # Set parameters for isolation in workspace and reset demo
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"{catalog}.e2e_lakehouse_{username}_db"
    source = f"{external_location}/e2e-lakehouse-source"
    table = f"{database}.target_table"
    checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    # Clear out data from previous demo execution
    dbutils.fs.rm(source, True)
    dbutils.fs.rm(checkpoint_path, True)
    
    # Define a class to load batches of data to source
    class LoadData:
    
      def __init__(self, source):
        self.source = source
    
      def get_date(self):
        try:
          df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
          raise Exception("Source data exhausted")
          return batch_date
    
      def get_batch(self, batch_date):
        return (
          spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )
    
      def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)
    
      def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)
    
    RawData = LoadData(source)
    
    # COMMAND ----------
    
    RawData.land_batch()
    
    # COMMAND ----------
    
    # Import functions
    from pyspark.sql.functions import col, current_timestamp
    
    # Configure Auto Loader to ingest JSON data to a Delta table
    (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.schemaLocation", checkpoint_path)
      .load(file_path)
      .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .option("mergeSchema", "true")
      .toTable(table))
    
    # COMMAND ----------
    
    df = spark.read.table(table_name)
    
    # COMMAND ----------
    
    display(df)
    

    クイックスタート: Azure portal を使用して Azure Databricks ワークスペースで Spark ジョブを実行するための Python ノートブックの場合、次の内容の notebook-quickstart-create-databricks-workspace-portal.py という名前のファイル。

    # Databricks notebook source
    blob_account_name = "azureopendatastorage"
    blob_container_name = "citydatacontainer"
    blob_relative_path = "Safety/Release/city=Seattle"
    blob_sas_token = r""
    
    # COMMAND ----------
    
    wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name,blob_relative_path)
    spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
    print('Remote blob path: ' + wasbs_path)
    
    # COMMAND ----------
    
    df = spark.read.parquet(wasbs_path)
    print('Register the DataFrame as a SQL temporary view: source')
    df.createOrReplaceTempView('source')
    
    # COMMAND ----------
    
    print('Displaying top 10 rows: ')
    display(spark.sql('SELECT * FROM source LIMIT 10'))
    
  6. ノートブックを作成する場合は、notebook.auto.tfvars という名前の別のファイルを作成し、次の内容をファイルに追加します。 このファイルには、ノートブック構成をカスタマイズするための変数値が含まれます。

    チュートリアル: エンドツーエンドの レイクハウス分析パイプラインを実行するの Python ノートブックの場合:

    notebook_subdirectory = "Terraform"
    notebook_filename     = "notebook-getting-started-lakehouse-e2e.py"
    notebook_language     = "PYTHON"
    

    クイック スタート: Azure portal を使用して Azure Databricks ワークスペースで Spark ジョブを実行するための Python ノートブックの場合:

    notebook_subdirectory = "Terraform"
    notebook_filename     = "notebook-quickstart-create-databricks-workspace-portal.py"
    notebook_language     = "PYTHON"
    
  7. ノートブックを作成する場合は、Azure Databricks ワークスペースで、次の手順を参照して、ノートブックを正常に実行するための要件をセットアップするようにしてください。

  8. ジョブを作成するには、job.tf という名前の別のファイルを作成し、次の内容をファイルに追加します。 この内容により、ノートブックを実行するジョブが作成されます。

    variable "job_name" {
      description = "A name for the job."
      type        = string
      default     = "My Job"
    }
    
    variable "task_key" {
      description = "A name for the task."
      type        = string
      default     = "my_task"
    }
    
    resource "databricks_job" "this" {
      name = var.job_name
      task {
        task_key = var.task_key
        existing_cluster_id = databricks_cluster.this.cluster_id
        notebook_task {
          notebook_path = databricks_notebook.this.path
        }
      }
      email_notifications {
        on_success = [ data.databricks_current_user.me.user_name ]
        on_failure = [ data.databricks_current_user.me.user_name ]
      }
    }
    
    output "job_url" {
      value = databricks_job.this.url
    }
    
  9. ジョブを作成する場合は、job.auto.tfvars という名前の別のファイルを作成し、次の内容をファイルに追加します。 このファイルには、ジョブ構成をカスタマイズするための変数値が含まれます。

    job_name = "My Job"
    task_key = "my_task"
    

ステップ 2: 構成を実行する

このステップでは、Terraform 構成を実行して、クラスター、ノートブック、ジョブを Azure Databricks ワークスペースにデプロイします。

  1. terraform validate コマンドを実行して、Terraform 構成が有効かどうかを確認します。 エラーが報告された場合は、修正し、コマンドを再度実行します。

    terraform validate
    
  2. terraform plan コマンドを実行して、Terraform の実際の実行前に、ワークスペースでの実行内容を確認します。

    terraform plan
    
  3. terraform apply コマンドを実行して、クラスター、ノートブック、およびジョブをワークスペースにデプロイします。 デプロイするかどうかを尋ねるメッセージが表示されたら、yes を入力して、Enter を押します。

    terraform apply
    

    Terraform により、プロジェクトで指定されたリソースがデプロイされます。 これらのリソース (特にクラスター) のデプロイには数分かかる場合があります。

ステップ 3: 結果を調べる

  1. クラスターを作成した場合は、terraform apply コマンドの出力で、cluster_url の横のリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。

  2. ノートブックを作成した場合は、terraform apply コマンドの出力で、notebook_url の横のリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。

    注意

    ノートブックを使用する前に、その内容をカスタマイズすることが必要な場合があります。 ノートブックをカスタマイズする方法については、関連するドキュメントを参照してください。

  3. ジョブを作成した場合は、terraform apply コマンドの出力で、job_url の横のリンクをコピーし、Web ブラウザーのアドレス バーに貼り付けます。

    注意

    ノートブックを実行する前に、その内容をカスタマイズすることが必要な場合があります。 ノートブックをカスタマイズする方法に関する関連ドキュメントについては、この記事の冒頭にあるリンクを参照してください。

  4. ジョブを作成した場合は、次のようにジョブを実行します。

    1. ジョブ ページで、[今すぐ実行] をクリック します。
    2. ジョブの実行が完了したら、ジョブ実行の結果を表示するには、ジョブ ページの [完了した実行 (過去 60 日間)] の一覧で、[開始時刻] 列の最新の時刻エントリをクリックします。 [出力] ペインには、ノートブックのコードを実行した結果が表示されます。

手順 4: クリーンアップする

このステップでは、ワークスペースから上記のリソースを削除します。

  1. terraform plan コマンドを実行して、Terraform の実際の実行前に、ワークスペースでの実行内容を確認します。

    terraform plan
    
  2. terraform destroy コマンドを実行して、クラスター、ノートブック、およびジョブをワークスペースから削除します。 削除するかどうかを尋ねるメッセージが表示されたら、yes を入力して、Enter を押します。

    terraform destroy
    

    Terraform により、プロジェクトで指定されたリソースが削除されます。