次の方法で共有


Databricks アセット バンドルのジョブにタスクを追加する

この記事では、"Databricks アセット バンドル" で Azure Databricks ジョブに追加できるさまざまな種類のタスクの例を示します。Databricks アセット バンドルとは」をご覧ください。

ほとんどのジョブ タスクの種類には、サポートされている設定の中でタスク固有のパラメーターがありますが、タスクに渡されるジョブ パラメーター 定義することもできます。 動的値参照は、ジョブ パラメーターでサポートされています。このパラメーターを使用すると、タスク間でジョブの実行に固有の値を渡すことができます。 「動的値参照とは?」を参照してください。

ジョブ タスクの設定をオーバーライドできます。 「Databricks アセット バンドルのジョブ タスク設定をオーバーライドする」を参照してください。

ヒント

Databricks CLIを使用して既存のジョブのリソース構成をすばやく生成するには、bundle generate job コマンドを使用できます。 bundle コマンドをご参照ください。

ノートブック タスク

このタスクを使用してノートブックを実行します。

次の例では、ノートブック タスクをジョブに追加し、my_job_run_id という名前のジョブ パラメーターを設定します。 デプロイするノートブックのパスは、このタスクが宣言されている構成ファイルに対する相対パスです。 このタスクは、Azure Databricks ワークスペース内のデプロイされた場所からノートブックを取得します。

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: '{{job.run_id}}'

このタスクに対して設定できるその他のマッピングについては、REST API リファレンス POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > notebook_task を参照してください。 「ジョブのノートブック タスク」を参照してください。

If/else 条件タスク

condition_task を使用すると、if/else 条件付きロジックを持つタスクをジョブに追加できます。 タスクは、他のタスクの実行を制御するために使用できる条件を評価します。 条件タスクは、クラスターの実行を必要とせず、再試行や通知をサポートしません。 if/else タスクの詳細については、「if/else タスク を使用してジョブに分岐ロジックを追加する」を参照してください。

次の例には、条件タスクとノートブック タスクが含まれています。ノートブック タスクは、ジョブの修復の数が 5 未満の場合にのみ実行されます。

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: '{{job.repair_count}}'
            right: '5'
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: 'true'
          notebook_task:
            notebook_path: ../src/notebook.ipynb

このタスクに対して設定できるその他のマッピングについては、REST API リファレンス POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > condition_task を参照してください。

For each タスク

for_each_task を使用すると、for each ループを使用したタスクをジョブに追加できます。 このタスクは、指定されたすべての入力に対して入れ子になったタスクを実行します。 for_each_taskの詳細については、「For each タスクを使用してループで別のタスクを実行する」を参照してください。

次の例では、ジョブに for_each_task を追加します。ジョブは、別のタスクの値を順番にループし、それらを処理します。

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: '{{tasks.generate_countries_list.values.countries}}'
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

このタスクに対して設定できるその他のマッピングについては、REST API リファレンス POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > for_each_task を参照してください。

Python スクリプト タスク

このタスクを使用して、Python ファイルを実行します。

次の例では、Python スクリプト タスクをジョブに追加します。 デプロイする Python ファイルのパスは、このタスクが宣言されている構成ファイルに対する相対パスです。 このタスクは、Azure Databricks ワークスペース内のデプロイされた場所から Python ファイルを取得します。

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

このタスクに対して設定できるその他のマッピングについては、REST API リファレンス POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > spark_python_task を参照してください。 「ジョブの Python スクリプト タスク」も参照してください。

Python ホイール タスク

このタスクを使用して、Python ホイール ファイルを実行します。

次の例では、Python ホイール タスクをジョブに追加します。 デプロイする Python ホイール ファイルのパスは、このタスクが宣言されている構成ファイルに対する相対パスです。 「Databricks アセット バンドルのライブラリ依存関係」を参照してください。

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

このタスクに対して設定できるその他のマッピングについては、REST API リファレンス POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > python_wheel_task を参照してください。 「Databricks アセット バンドルを使用した Python ホイール ファイルの開発」と「ジョブの Python ホイール タスク」も参照してください。

JAR タスク

このタスクを使用して JAR を実行します。 ローカル JAR ライブラリ、またはワークスペース内のライブラリ、Unity カタログ ボリューム、または外部クラウド ストレージの場所を参照できます。 「Databricks アセット バンドルのライブラリ依存関係」を参照してください。

次の例では、JAR タスクをジョブに追加します。 JAR のパスは、指定されたボリュームの場所を指します。

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

このタスクに対して設定できるその他のマッピングについては、REST API リファレンス POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > spark_jar_task を参照してください。 「ジョブの JAR タスク」を参照してください。

SQL ファイル タスク

このタスクを使用して、ワークスペースまたはリモート Git リポジトリ内にある SQL ファイルを実行します。

次の例では、SQL ファイル タスクをジョブに追加します。 この SQL ファイル タスクは、指定した SQL ウェアハウスを使用して、指定した SQL ファイルを実行します。

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

SQL ウェアハウスの ID を取得するには、SQL ウェアハウスの設定ページを開き、[の概要] タブの [] フィールドで、倉庫の名前の後にかっこで囲まれた ID をコピーします。

このタスクに対して設定できるその他のマッピングについては、REST API リファレンス POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > sql_task > file を参照してください。 「ジョブ用のSQLタスク」を参照してください。

DLT パイプライン タスク

このタスクを使用して、DLT パイプラインを実行します。 「DLT とは」を参照してください。.

次の例では、DLT パイプライン タスクをジョブに追加します。 この DLT パイプライン タスクは、指定されたパイプラインを実行します。

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

パイプラインの ID を取得するには、ワークスペースでパイプラインを開き、パイプラインの設定ページの [パイプラインの詳細] タブで パイプライン ID 値をコピーします。

このタスクに対して設定できるその他のマッピングについては、REST API リファレンス POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > pipeline_task を参照してください。 ジョブ については、DLT パイプライン タスクのを参照してください。

dbt タスク

このタスクを使用して、1 つ以上の dbt コマンドを実行します。 「dbt Cloud に接続する」をご参照ください。

次の例では、dbt タスクをジョブに追加します。 この dbt タスクは、指定した SQL ウェアハウスを使用して、指定した dbt コマンドを実行します。

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - 'dbt deps'
              - 'dbt seed'
              - 'dbt run'
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: 'dbt-databricks>=1.0.0,<2.0.0'

SQL ウェアハウスの ID を取得するには、SQL ウェアハウスの設定ページを開き、[の概要] タブの [] フィールドで、倉庫の名前の後にかっこで囲まれた ID をコピーします。

このタスクに対して設定できるその他のマッピングについては、REST API リファレンス POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > dbt_task を参照してください。 「ジョブの dbt タスク」を参照してください。

Databricks アセット バンドルには、dbt タスクを使用してジョブを定義する dbt-sql プロジェクト テンプレートと、デプロイされた dbt ジョブの dbt プロファイルも含まれています。 Databricks アセット バンドル テンプレートの詳細については、「既定のバンドル テンプレート 」を参照してください。

ジョブタスクを実行する

このタスクを使って、別のジョブを実行します。

次の例では、最初のジョブを実行する実行ジョブ タスクが、2 つめのジョブに含まれています。

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: '13.3.x-scala2.12'
            node_type_id: 'i3.xlarge'
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

この例では、置換を使って、実行するジョブの ID を取得します。 UI からジョブの ID を取得するには、ワークスペースでジョブを開き、ジョブの設定ページの [ジョブの詳細] タブにある ジョブ ID 値から ID をコピーします。

このタスクに対して設定できるその他のマッピングについては、REST API リファレンス POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > run_job_task を参照してください。