Добавление задач в задания в Databricks Asset Bundles
В этой статье приведены примеры различных типов задач, которые можно добавить в задания Azure Databricks в пакетах ресурсов Databricks. См. раздел "Что такое пакеты ресурсов Databricks?".
Большинство типов рабочих задач имеют специфические параметры среди поддерживаемых настроек, но можно также определить параметры задания, которые передаются в задачи. Динамические ссылки на значения поддерживаются для параметров задания, которые позволяют передавать значения, относящиеся к выполнению задания между задачами. См . статью "Что такое ссылка на динамическое значение?".
Примечание.
Параметры задачи задания можно переопределить. Смотрите раздел "Переопределение параметров задач задания" в пакетах ресурсов Databricks.
Совет
Чтобы быстро создать конфигурацию ресурсов для существующего задания с помощью интерфейса командной строки Databricks, можно использовать bundle generate job
команду. Смотрите команды пакета.
Задача ноутбука
Вы используете эту задачу для запуска записной книжки.
В следующем примере задача в записной книжке добавляется в задание и устанавливает параметр задания с именем my_job_run_id
. Путь для развертывания записной книжки является относительным по отношению к файлу конфигурации, в котором задана эта задача. Задача получает ноутбук из места его развертывания в рабочей области Azure Databricks.
resources:
jobs:
my-notebook-job:
name: my-notebook-job
tasks:
- task_key: my-notebook-task
notebook_task:
notebook_path: ./my-notebook.ipynb
parameters:
- name: my_job_run_id
default: '{{job.run_id}}'
Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > notebook_task
в полезных данных запроса операции создания работы, как это указано в разделе POST /api/2.1/jobs/create справочной документации REST API, представленной в формате YAML. См . задачу "Записная книжка" для заданий.
Задача по условию if/else
condition_task
позволяет добавить задачу с условной логикой if/else в задание. Задача оценивает условие, которое можно использовать для управления выполнением других задач. Условная задача не требует кластера для выполнения и не поддерживает повторные попытки или уведомления. Дополнительные сведения о задаче if/else см. в разделе Добавление логики ветвления в задание с помощьюзадачи If/else.
В следующем примере содержится условная задача и задача в записной книжке, где задача в записной книжке выполняется только в том случае, если количество исправлений заданий меньше 5.
resources:
jobs:
my-job:
name: my-job
tasks:
- task_key: condition_task
condition_task:
op: LESS_THAN
left: '{{job.repair_count}}'
right: '5'
- task_key: notebook_task
depends_on:
- task_key: condition_task
outcome: 'true'
notebook_task:
notebook_path: ../src/notebook.ipynb
Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > condition_task
в полезных данных запроса операции создания задания, как определено в POST /api/2.1/jobs/create в справочнике REST API в формате YAML.
Для каждой задачи
for_each_task
позволяет вам добавить задачу, использующую цикл for each, в задание. Задача выполняет вложенную задачу для каждого предоставленного ввода. Дополнительные сведения о for_each_task
см. в разделе Использование задачи For each
для выполнения другой задачи в цикле.
В следующем примере к заданию добавляется for_each_task
, где он проходит по значениям другой задачи и обрабатывает их.
resources:
jobs:
my_job:
name: my_job
tasks:
- task_key: generate_countries_list
notebook_task:
notebook_path: ../src/generate_countries_list.ipnyb
- task_key: process_countries
depends_on:
- task_key: generate_countries_list
for_each_task:
inputs: '{{tasks.generate_countries_list.values.countries}}'
task:
task_key: process_countries_iteration
notebook_task:
notebook_path: ../src/process_countries_notebook.ipnyb
Дополнительные сопоставления, которые можно задать для этой задачи, см. в tasks > for_each_task
в запросе операции создания задания, как определено в POST /api/2.1/jobs/create в документации REST API в формате YAML.
Задача скрипта Python
Эта задача используется для запуска файла Python.
В следующем примере задача скрипта Python добавляется в задание. Путь к файлу Python, который необходимо развернуть, задан относительно файла конфигурации, в котором объявлена эта задача. Задача получает файл Python из места развертывания в рабочей области Azure Databricks.
resources:
jobs:
my-python-script-job:
name: my-python-script-job
tasks:
- task_key: my-python-script-task
spark_python_task:
python_file: ./my-script.py
Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > spark_python_task
в теле запроса операции создания задания, как определено в POST /api/2.1/jobs/create в справочнике REST API, при этом полезные данные выражены в формате YAML. См. также скрипт Python для задач.
Задача колеса Python
Вы используете эту операцию для запуска wheel-файла Python.
В следующем примере к заданию добавляется задача с использованием Python wheel. Путь к файлу колеса Python для развертывания задается относительно файла конфигурации, в котором объявлена эта задача. См. сведения о зависимостях библиотеки пакетов ресурсов Databricks.
resources:
jobs:
my-python-wheel-job:
name: my-python-wheel-job
tasks:
- task_key: my-python-wheel-task
python_wheel_task:
entry_point: run
package_name: my_package
libraries:
- whl: ./my_package/dist/my_package-*.whl
Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > python_wheel_task
в запросе операции создания задания, как определено в POST /api/2.1/jobs/create в справочнике REST API в формате YAML. См. также Создание wheel-файла на Python с помощью пакетов Databricks Asset Bundles и задачу Python Wheel для выполнения заданий.
ЗАДАЧА JAR
Эта задача используется для запуска JAR-файла. Вы можете ссылаться на локальные библиотеки JAR или на те, которые находятся в рабочей области, томе каталога Unity или во внешнем облачном хранилище. См. сведения о зависимостях библиотеки пакетов ресурсов Databricks.
В следующем примере задача JAR добавляется в задание. Путь к JAR-файлу ведёт к указанному расположению тома.
resources:
jobs:
my-jar-job:
name: my-jar-job
tasks:
- task_key: my-jar-task
spark_jar_task:
main_class_name: org.example.com.Main
libraries:
- jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar
Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > spark_jar_task
в теле запроса операции создания задачи, как определено в POST /api/2.1/jobs/create в разделе справочника REST API, представленном в формате YAML. Смотрите задачу JAR для заданий.
Задача файла SQL
Эта задача используется для запуска SQL-файла, расположенного в рабочей области или удаленном репозитории Git.
В следующем примере задача SQL-файла добавляется в задание. Эта задача файла SQL использует указанное хранилище SQL для запуска указанного SQL-файла.
resources:
jobs:
my-sql-file-job:
name: my-sql-file-job
tasks:
- task_key: my-sql-file-task
sql_task:
file:
path: /Users/someone@example.com/hello-world.sql
source: WORKSPACE
warehouse_id: 1a111111a1111aa1
Чтобы получить идентификатор хранилища SQL, откройте страницу параметров хранилища SQL, а затем скопируйте идентификатор, найденный в скобках после имени хранилища в поле "Имя " на вкладке "Обзор ".
Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > sql_task > file
в теле запроса операции создания задания, как указано в POST /api/2.1/jobs/create в справочнике REST API, представленном в формате YAML. См. задание SQL для заданий.
Задача конвейера DLT
Вы используете эту задачу для запуска конвейера DLT. См. Что такое DLT?.
В следующем примере в задание добавляется задача конвейера DLT. Эта задача DLT запускает указанный конвейер.
resources:
jobs:
my-pipeline-job:
name: my-pipeline-job
tasks:
- task_key: my-pipeline-task
pipeline_task:
pipeline_id: 11111111-1111-1111-1111-111111111111
Идентификатор конвейера можно получить, открыв конвейер в рабочей области и скопируйв значение идентификатора конвейера на вкладке сведений о конвейере страницы параметров конвейера.
Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > pipeline_task
в полезной нагрузке запроса операции создания задания, как определено в POST /api/2.1/jobs/create в справочнике по REST API, выраженном в формате YAML. См. задачу конвейера DLT для заданий.
Задача dbt
Эта задача используется для выполнения одной или нескольких команд dbt. См. статью "Подключение к dbt Cloud".
В следующем примере задача dbt добавляется в задание. Эта задача dbt использует указанное хранилище SQL для выполнения указанных команд dbt.
resources:
jobs:
my-dbt-job:
name: my-dbt-job
tasks:
- task_key: my-dbt-task
dbt_task:
commands:
- 'dbt deps'
- 'dbt seed'
- 'dbt run'
project_directory: /Users/someone@example.com/Testing
warehouse_id: 1a111111a1111aa1
libraries:
- pypi:
package: 'dbt-databricks>=1.0.0,<2.0.0'
Чтобы получить идентификатор хранилища SQL, откройте страницу параметров хранилища SQL, а затем скопируйте идентификатор, найденный в скобках после имени хранилища в поле "Имя " на вкладке "Обзор ".
Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > dbt_task
в теле запроса операции создания задания, как указано в POST /api/2.1/jobs/create в справке по REST API, выраженной в формате YAML. См. задачу dbt для рабочих процессов.
Пакеты ресурсов Databricks также включают dbt-sql
шаблон проекта, который определяет задание с dbt-операцией, а также dbt-профили для развернутых dbt-заданий. Дополнительные сведения о шаблонах пакетов ресурсов Databricks см. в шаблонах пакетов по умолчанию.
Выполнение задания
Эта задача используется для выполнения другого задания.
В следующем примере содержится задача выполнения задания во втором задании, на котором выполняется первое задание.
resources:
jobs:
my-first-job:
name: my-first-job
tasks:
- task_key: my-first-job-task
new_cluster:
spark_version: '13.3.x-scala2.12'
node_type_id: 'i3.xlarge'
num_workers: 2
notebook_task:
notebook_path: ./src/test.py
my_second_job:
name: my-second-job
tasks:
- task_key: my-second-job-task
run_job_task:
job_id: ${resources.jobs.my-first-job.id}
В этом примере используется подстановка для получения идентификатора выполняемого задания. Чтобы получить идентификатор задания из пользовательского интерфейса, откройте задание в рабочей области и скопируйте идентификатор из значения идентификатора задания на вкладке сведений о задании на странице параметров заданий.
Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > run_job_task
в полезных данных запроса операции создания работы, как определено в POST /api/2.1/jobs/create в справочнике REST API, выраженном в формате YAML.