Поделиться через


Добавление задач в задания в Databricks Asset Bundles

В этой статье приведены примеры различных типов задач, которые можно добавить в задания Azure Databricks в пакетах ресурсов Databricks. См. раздел "Что такое пакеты ресурсов Databricks?".

Большинство типов рабочих задач имеют специфические параметры среди поддерживаемых настроек, но можно также определить параметры задания, которые передаются в задачи. Динамические ссылки на значения поддерживаются для параметров задания, которые позволяют передавать значения, относящиеся к выполнению задания между задачами. См . статью "Что такое ссылка на динамическое значение?".

Примечание.

Параметры задачи задания можно переопределить. Смотрите раздел "Переопределение параметров задач задания" в пакетах ресурсов Databricks.

Совет

Чтобы быстро создать конфигурацию ресурсов для существующего задания с помощью интерфейса командной строки Databricks, можно использовать bundle generate job команду. Смотрите команды пакета.

Задача ноутбука

Вы используете эту задачу для запуска записной книжки.

В следующем примере задача в записной книжке добавляется в задание и устанавливает параметр задания с именем my_job_run_id. Путь для развертывания записной книжки является относительным по отношению к файлу конфигурации, в котором задана эта задача. Задача получает ноутбук из места его развертывания в рабочей области Azure Databricks.

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: '{{job.run_id}}'

Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > notebook_task в полезных данных запроса операции создания работы, как это указано в разделе POST /api/2.1/jobs/create справочной документации REST API, представленной в формате YAML. См . задачу "Записная книжка" для заданий.

Задача по условию if/else

condition_task позволяет добавить задачу с условной логикой if/else в задание. Задача оценивает условие, которое можно использовать для управления выполнением других задач. Условная задача не требует кластера для выполнения и не поддерживает повторные попытки или уведомления. Дополнительные сведения о задаче if/else см. в разделе Добавление логики ветвления в задание с помощьюзадачи If/else.

В следующем примере содержится условная задача и задача в записной книжке, где задача в записной книжке выполняется только в том случае, если количество исправлений заданий меньше 5.

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: '{{job.repair_count}}'
            right: '5'
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: 'true'
          notebook_task:
            notebook_path: ../src/notebook.ipynb

Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > condition_task в полезных данных запроса операции создания задания, как определено в POST /api/2.1/jobs/create в справочнике REST API в формате YAML.

Для каждой задачи

for_each_task позволяет вам добавить задачу, использующую цикл for each, в задание. Задача выполняет вложенную задачу для каждого предоставленного ввода. Дополнительные сведения о for_each_taskсм. в разделе Использование задачи For each для выполнения другой задачи в цикле.

В следующем примере к заданию добавляется for_each_task, где он проходит по значениям другой задачи и обрабатывает их.

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: '{{tasks.generate_countries_list.values.countries}}'
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

Дополнительные сопоставления, которые можно задать для этой задачи, см. в tasks > for_each_task в запросе операции создания задания, как определено в POST /api/2.1/jobs/create в документации REST API в формате YAML.

Задача скрипта Python

Эта задача используется для запуска файла Python.

В следующем примере задача скрипта Python добавляется в задание. Путь к файлу Python, который необходимо развернуть, задан относительно файла конфигурации, в котором объявлена эта задача. Задача получает файл Python из места развертывания в рабочей области Azure Databricks.

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > spark_python_task в теле запроса операции создания задания, как определено в POST /api/2.1/jobs/create в справочнике REST API, при этом полезные данные выражены в формате YAML. См. также скрипт Python для задач.

Задача колеса Python

Вы используете эту операцию для запуска wheel-файла Python.

В следующем примере к заданию добавляется задача с использованием Python wheel. Путь к файлу колеса Python для развертывания задается относительно файла конфигурации, в котором объявлена эта задача. См. сведения о зависимостях библиотеки пакетов ресурсов Databricks.

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > python_wheel_task в запросе операции создания задания, как определено в POST /api/2.1/jobs/create в справочнике REST API в формате YAML. См. также Создание wheel-файла на Python с помощью пакетов Databricks Asset Bundles и задачу Python Wheel для выполнения заданий.

ЗАДАЧА JAR

Эта задача используется для запуска JAR-файла. Вы можете ссылаться на локальные библиотеки JAR или на те, которые находятся в рабочей области, томе каталога Unity или во внешнем облачном хранилище. См. сведения о зависимостях библиотеки пакетов ресурсов Databricks.

В следующем примере задача JAR добавляется в задание. Путь к JAR-файлу ведёт к указанному расположению тома.

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > spark_jar_task в теле запроса операции создания задачи, как определено в POST /api/2.1/jobs/create в разделе справочника REST API, представленном в формате YAML. Смотрите задачу JAR для заданий.

Задача файла SQL

Эта задача используется для запуска SQL-файла, расположенного в рабочей области или удаленном репозитории Git.

В следующем примере задача SQL-файла добавляется в задание. Эта задача файла SQL использует указанное хранилище SQL для запуска указанного SQL-файла.

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

Чтобы получить идентификатор хранилища SQL, откройте страницу параметров хранилища SQL, а затем скопируйте идентификатор, найденный в скобках после имени хранилища в поле "Имя " на вкладке "Обзор ".

Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > sql_task > file в теле запроса операции создания задания, как указано в POST /api/2.1/jobs/create в справочнике REST API, представленном в формате YAML. См. задание SQL для заданий.

Задача конвейера DLT

Вы используете эту задачу для запуска конвейера DLT. См. Что такое DLT?.

В следующем примере в задание добавляется задача конвейера DLT. Эта задача DLT запускает указанный конвейер.

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

Идентификатор конвейера можно получить, открыв конвейер в рабочей области и скопируйв значение идентификатора конвейера на вкладке сведений о конвейере страницы параметров конвейера.

Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > pipeline_task в полезной нагрузке запроса операции создания задания, как определено в POST /api/2.1/jobs/create в справочнике по REST API, выраженном в формате YAML. См. задачу конвейера DLT для заданий.

Задача dbt

Эта задача используется для выполнения одной или нескольких команд dbt. См. статью "Подключение к dbt Cloud".

В следующем примере задача dbt добавляется в задание. Эта задача dbt использует указанное хранилище SQL для выполнения указанных команд dbt.

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - 'dbt deps'
              - 'dbt seed'
              - 'dbt run'
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: 'dbt-databricks>=1.0.0,<2.0.0'

Чтобы получить идентификатор хранилища SQL, откройте страницу параметров хранилища SQL, а затем скопируйте идентификатор, найденный в скобках после имени хранилища в поле "Имя " на вкладке "Обзор ".

Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > dbt_task в теле запроса операции создания задания, как указано в POST /api/2.1/jobs/create в справке по REST API, выраженной в формате YAML. См. задачу dbt для рабочих процессов.

Пакеты ресурсов Databricks также включают dbt-sql шаблон проекта, который определяет задание с dbt-операцией, а также dbt-профили для развернутых dbt-заданий. Дополнительные сведения о шаблонах пакетов ресурсов Databricks см. в шаблонах пакетов по умолчанию.

Выполнение задания

Эта задача используется для выполнения другого задания.

В следующем примере содержится задача выполнения задания во втором задании, на котором выполняется первое задание.

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: '13.3.x-scala2.12'
            node_type_id: 'i3.xlarge'
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

В этом примере используется подстановка для получения идентификатора выполняемого задания. Чтобы получить идентификатор задания из пользовательского интерфейса, откройте задание в рабочей области и скопируйте идентификатор из значения идентификатора задания на вкладке сведений о задании на странице параметров заданий.

Дополнительные сопоставления, которые можно задать для этой задачи, смотрите в tasks > run_job_task в полезных данных запроса операции создания работы, как определено в POST /api/2.1/jobs/create в справочнике REST API, выраженном в формате YAML.