Databricks アセット バンドルのジョブにタスクを追加する
この記事では、"Databricks アセット バンドル" で Azure Databricks ジョブに追加できるさまざまな種類のタスクの例を示します。 「Databricks アセット バンドルとは」をご覧ください。
ほとんどのジョブ タスクの種類では、サポートされている設定の中にタスク固有のparametersが含まれますが、タスクに渡すジョブ parametersを定義することもできます。 動的値参照は、ジョブ parametersでサポートされており、タスク間でジョブの実行に固有の values を渡すことができます。 「動的値参照とは?」を参照してください。
Note
ジョブ タスクの設定をオーバーライドできます。 「Databricks アセット バンドルのジョブ タスク設定をオーバーライドする」を参照してください。
ヒント
Databricks CLIを使用して既存のジョブのリソース構成をすばやく generate するには、bundle generate job
コマンドを使用します。 bundle コマンドをご参照ください。
ノートブック タスク
このタスクを使用してノートブックを実行します。
次の例では、ノートブック タスクをジョブに追加し、my_job_run_id
という名前のジョブ パラメーターを設定します。 デプロイするノートブックのパスは、このタスクが宣言されている構成ファイルに対する相対パスです。 このタスクは、Azure Databricks ワークスペース内のデプロイされた場所からノートブックを取得します。
resources:
jobs:
my-notebook-job:
name: my-notebook-job
tasks:
- task_key: my-notebook-task
notebook_task:
notebook_path: ./my-notebook.ipynb
parameters:
- name: my_job_run_id
default: "{{job.run_id}}"
このタスクに set できるその他のマッピングについては、REST API リファレンスの POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > notebook_task
を参照してください。 「ジョブの Notebook タスク」を参照してください。
If/else 条件タスク
condition_task
を使用すると、if/else 条件付きロジックを持つタスクをジョブに追加できます。 タスクは、他のタスクの実行を制御するために使用できる条件を評価します。 条件タスクは、クラスターの実行を必要とせず、再試行や通知をサポートしません。 if/else タスクの詳細については、「if/else タスク を使用してジョブに分岐ロジックを追加する」を参照してください。
次の例には、条件タスクとノートブック タスクが含まれています。where ノートブック タスクは、ジョブの修復の数が 5 未満の場合にのみ実行されます。
resources:
jobs:
my-job:
name: my-job
tasks:
- task_key: condition_task
condition_task:
op: LESS_THAN
left: "{{job.repair_count}}"
right: "5"
- task_key: notebook_task
depends_on:
- task_key: condition_task
outcome: "true"
notebook_task:
notebook_path: ../src/notebook.ipynb
このタスクに set できるその他のマッピングについては、REST API リファレンスの POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > condition_task
を参照してください。
タスクごと
for_each_task
を使用すると、for each ループを使用したタスクをジョブに追加できます。 このタスクは、指定されたすべての入力に対して入れ子になったタスクを実行します。 for_each_task
の詳細については、「ループでパラメーター化された Azure Databricks ジョブ タスクを実行する」を参照してください。
次の例では、ジョブに for_each_task
を追加します。ここで別のタスクの values がループ処理され、それらが処理されます。
resources:
jobs:
my_job:
name: my_job
tasks:
- task_key: generate_countries_list
notebook_task:
notebook_path: ../src/generate_countries_list.ipnyb
- task_key: process_countries
depends_on:
- task_key: generate_countries_list
for_each_task:
inputs: "{{tasks.generate_countries_list.values.countries}}"
task:
task_key: process_countries_iteration
notebook_task:
notebook_path: ../src/process_countries_notebook.ipnyb
このタスクに set できるその他のマッピングについては、REST API リファレンスの POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > for_each_task
を参照してください。
Python スクリプト タスク
このタスクを使用して、Python ファイルを実行します。
次の例では、Python スクリプト タスクをジョブに追加します。 デプロイする Python ファイルのパスは、このタスクが宣言されている構成ファイルに対する相対パスです。 このタスクは、Azure Databricks ワークスペース内のデプロイされた場所から Python ファイルを取得します。
resources:
jobs:
my-python-script-job:
name: my-python-script-job
tasks:
- task_key: my-python-script-task
spark_python_task:
python_file: ./my-script.py
このタスクに set できるその他のマッピングについては、REST API リファレンスの POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > spark_python_task
を参照してください。 「ジョブの Python スクリプト タスク」も参照してください。
Python ホイール タスク
このタスクを使用して、Python ホイール ファイルを実行します。
次の例では、Python ホイール タスクをジョブに追加します。 デプロイする Python ホイール ファイルのパスは、このタスクが宣言されている構成ファイルに対する相対パスです。 「Databricks アセット バンドルのライブラリ依存関係」を参照してください。
resources:
jobs:
my-python-wheel-job:
name: my-python-wheel-job
tasks:
- task_key: my-python-wheel-task
python_wheel_task:
entry_point: run
package_name: my_package
libraries:
- whl: ./my_package/dist/my_package-*.whl
このタスクに set できるその他のマッピングについては、REST API リファレンスの POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > python_wheel_task
を参照してください。 「Databricks アセット バンドルを使用した Python ホイール ファイルの開発」と「ジョブの Python ホイール タスク」も参照してください。
JAR タスク
このタスクを使用して JAR を実行します。 ローカル JAR ライブラリ、ワークスペース内の JAR ライブラリ、Unity Catalog ボリューム、または外部クラウド ストレージの場所を参照できます。 「Databricks アセット バンドルのライブラリ依存関係」を参照してください。
次の例では、JAR タスクをジョブに追加します。 JAR のパスは、指定されたボリュームの場所を指します。
resources:
jobs:
my-jar-job:
name: my-jar-job
tasks:
- task_key: my-jar-task
spark_jar_task:
main_class_name: org.example.com.Main
libraries:
- jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar
このタスクに set できるその他のマッピングについては、REST API リファレンスの POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > spark_jar_task
を参照してください。 「ジョブの JAR タスク」を参照してください。
SQL ファイル タスク
このタスクを使用して、ワークスペースまたはリモート Git リポジトリ内にある SQL ファイルを実行します。
次の例では、SQL ファイル タスクをジョブに追加します。 この SQL ファイル タスクは、指定した SQL ウェアハウスを使用して、指定した SQL ファイルを実行します。
resources:
jobs:
my-sql-file-job:
name: my-sql-file-job
tasks:
- task_key: my-sql-file-task
sql_task:
file:
path: /Users/someone@example.com/hello-world.sql
source: WORKSPACE
warehouse_id: 1a111111a1111aa1
SQL ウェアハウスの ID を get するには、SQL ウェアハウスの設定ページを開き、[の概要] タブの [名] フィールドで、倉庫の名前の後にかっこで囲まれた ID をコピーします。
このタスクに set できるその他のマッピングについては、REST API リファレンスの POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > sql_task > file
を参照してください。 「ジョブの SQL タスク」を参照してください。
Delta Live Tables パイプライン タスク
このタスクを使用して、Delta Live Tables パイプラインを実行します。 デルタ ライブとは何ですか?Tablesを参照してください。.
次の例では、Delta Live Tables パイプライン タスクをジョブに追加します。 この Delta Live Tables パイプライン タスクは、指定されたパイプラインを実行します。
resources:
jobs:
my-pipeline-job:
name: my-pipeline-job
tasks:
- task_key: my-pipeline-task
pipeline_task:
pipeline_id: 11111111-1111-1111-1111-111111111111
パイプラインの ID を get するには、ワークスペースでパイプラインを開き、パイプラインの設定ページの [パイプラインの詳細] タブで パイプライン ID 値をコピーします。
このタスクに set できるその他のマッピングについては、REST API リファレンスの POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > pipeline_task
を参照してください。 「ジョブの Delta Live Tables パイプライン タスク」を参照してください。
dbt タスク
このタスクを使用して、1 つ以上の dbt コマンドを実行します。 「dbt Cloud に接続する」をご参照ください。
次の例では、dbt タスクをジョブに追加します。 この dbt タスクは、指定した SQL ウェアハウスを使用して、指定した dbt コマンドを実行します。
resources:
jobs:
my-dbt-job:
name: my-dbt-job
tasks:
- task_key: my-dbt-task
dbt_task:
commands:
- "dbt deps"
- "dbt seed"
- "dbt run"
project_directory: /Users/someone@example.com/Testing
warehouse_id: 1a111111a1111aa1
libraries:
- pypi:
package: "dbt-databricks>=1.0.0,<2.0.0"
SQL ウェアハウスの ID を get するには、SQL ウェアハウスの設定ページを開き、[の概要] タブの [名] フィールドで、倉庫の名前の後にかっこで囲まれた ID をコピーします。
このタスクに set できるその他のマッピングについては、REST API リファレンスの POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > dbt_task
を参照してください。 「ジョブの dbt タスク」を参照してください。
Databricks アセット バンドルには、dbt タスクを使用してジョブを定義する dbt-sql
プロジェクト テンプレートと、デプロイされた dbt ジョブの dbt プロファイルも含まれています。 Databricks アセット バンドル テンプレートの詳細については、「既定のバンドル テンプレートを使用する」を参照してください。
実行ジョブ タスク
このタスクを使って、別のジョブを実行します。
次の例には、最初のジョブを実行する 2 つ目のジョブに実行ジョブ タスクが含まれています。
resources:
jobs:
my-first-job:
name: my-first-job
tasks:
- task_key: my-first-job-task
new_cluster:
spark_version: "13.3.x-scala2.12"
node_type_id: "i3.xlarge"
num_workers: 2
notebook_task:
notebook_path: ./src/test.py
my_second_job:
name: my-second-job
tasks:
- task_key: my-second-job-task
run_job_task:
job_id: ${resources.jobs.my-first-job.id}
この例では、置換を使って、実行するジョブの ID を取得します。 UI からジョブの ID を get するには、ワークスペースでジョブを開き、ジョブの設定ページの [ジョブの詳細] タブの ジョブ ID 値から ID をコピーします。
このタスクに set できるその他のマッピングについては、REST API リファレンスの POST /api/2.1/jobs/create (YAML 形式で表される) で定義されているジョブの作成操作の要求ペイロードの tasks > run_job_task
を参照してください。