Condividi tramite


Aggiungi attività ai job nei pacchetti di risorse di Databricks

Questo articolo fornisce esempi di vari tipi di attività che è possibile aggiungere ai job di Azure Databricks in Databricks Asset Bundles. Consultare Che cosa sono i Databricks Asset Bundle?.

La maggior parte dei tipi di attività di processo include parametri specifici dell'attività tra le impostazioni supportate, ma è anche possibile definire i parametri del processo che vengono passati alle attività. I riferimenti a valori dinamici sono supportati per i parametri del processo, che consentono di passare valori specifici per l'esecuzione del processo tra le attività. Vedere Che cos'è un valore dinamico di riferimento?.

Nota

È possibile eseguire l'override delle impostazioni delle attività di lavoro. Vedi Impostazioni di override delle attività di lavoro nei pacchetti di risorse di Databricks.

Suggerimento

Per generare rapidamente la configurazione delle risorse per un processo esistente usando l'interfaccia della riga di comando di Databricks, è possibile usare il comando bundle generate job. Vedere i comandi del bundle.

Attività del notebook

Usi questo compito per eseguire un notebook.

Nell'esempio seguente viene aggiunta un'attività notebook a un processo e viene impostato un parametro di processo denominato my_job_run_id. Il percorso del notebook da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. L'attività ottiene il notebook dalla posizione implementata nell'area di lavoro di Azure Databricks.

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: '{{job.run_id}}'

Per altri mapping che è possibile impostare per questa attività, vedere tasks > notebook_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedi Attività del notebook per le attività.

Attività di condizione If/else

Il condition_task consente di aggiungere una fase con logica condizionale if/else al tuo lavoro. L'attività valuta una condizione che può essere usata per controllare l'esecuzione di altre attività. L'attività condizione non richiede un cluster per l'esecuzione e non supporta ritenti o notifiche. Per altre informazioni sull'attività if/else, vedere Aggiungere logica di diramazione a un processo con l'attività If/else.

L'esempio seguente contiene un'attività condizionale e un'attività di notebook, in cui l'attività di notebook viene eseguita solo se il numero di riparazioni del compito è inferiore a 5.

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: '{{job.repair_count}}'
            right: '5'
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: 'true'
          notebook_task:
            notebook_path: ../src/notebook.ipynb

Per altri mapping che è possibile impostare per questa attività, vedere tasks > condition_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.

Per ogni attività

Il for_each_task consente di aggiungere un'operazione con un ciclo 'for each' al tuo processo. L'attività esegue un'attività nidificata per ciascun input fornito. Per ulteriori informazioni sulla for_each_task, consultare Utilizzare un'attività di For each per eseguire un'altra attività in un ciclo.

Nell'esempio seguente viene aggiunto un for_each_task a un'attività, iterando sui valori di un'altra attività e processandoli.

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: '{{tasks.generate_countries_list.values.countries}}'
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

Per altri mapping che è possibile impostare per questa attività, vedere tasks > for_each_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.

Attività di script Python

Utilizzi questa attività per eseguire un file Python.

L'esempio seguente aggiunge un'attività di script Python a un lavoro. Il percorso del file Python da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. L'attività recupera il file Python dalla posizione in cui è stato distribuito nell'area di lavoro di Azure Databricks.

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

Per altri mapping che è possibile impostare per questa attività, vedere tasks > spark_python_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere anche Attività script Python per i processi.

Attività "Python wheel"

Usi questa attività per eseguire un file Python wheel.

L'esempio seguente aggiunge un task di tipo wheel per Python a un job. Il percorso del file Python wheel da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. Consulta Dipendenze delle librerie Asset Bundles di Databricks.

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

Per altri mapping che è possibile impostare per questa attività, vedere tasks > python_wheel_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Consultare anche Sviluppare un file Python wheel utilizzando i Databricks Asset Bundles e Attività Python Wheel per i lavori.

Attività JAR

Usate questa attività per eseguire un file JAR. È possibile fare riferimento a librerie JAR locali o in un'area di lavoro, in un volume del catalogo Unity o in un percorso di archiviazione cloud esterno. Consulta Dipendenze delle librerie Asset Bundles di Databricks.

L'esempio seguente aggiunge un file JAR a un processo. Il percorso del file JAR si trova nella posizione del volume specificata.

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

Per altri mapping che è possibile impostare per questa attività, vedere tasks > spark_jar_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività JAR per i lavori.

Attività file SQL

Questa attività viene usata per eseguire un file SQL che si trova in un'area di lavoro o in un repository Git remoto.

L'esempio seguente aggiunge un file SQL a un processo. Questa attività file SQL usa il warehouse SQL specificato per eseguire il file SQL specificato.

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

Per ottenere l’ID di un warehouse SQL, aprire la pagina delle impostazioni del warehouse SQL, quindi copiare l'ID tra parentesi dopo il nome del warehouse nel campo Nome della scheda Panoramica.

Per altri mapping che è possibile impostare per questa attività, vedere tasks > sql_task > file nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività SQL per i processi.

Attività della catena DLT

Usi questo compito per eseguire una pipeline DLT. Vedere Che cos'è DLT?.

Nell'esempio seguente viene aggiunta un'attività della pipeline DLT a un compito. Questa attività della pipeline DLT esegue la pipeline specificata.

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

È possibile ottenere l'ID di una pipeline aprendo la pipeline nell'area di lavoro e copiando il valore dell'ID pipeline nella scheda Dettagli pipeline della pagina delle impostazioni della pipeline.

Per altri mapping che è possibile impostare per questa attività, vedere tasks > pipeline_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere l'attività della pipeline DLT per i lavori.

attività di dbt

Questa attività viene usata per eseguire uno o più comandi dbt. Vedere Connettersi a dbt Cloud.

L'esempio seguente aggiunge un'attività dbt a un compito. Questa attività dbt usa il data warehouse SQL specificato per eseguire i comandi dbt specificati.

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - 'dbt deps'
              - 'dbt seed'
              - 'dbt run'
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: 'dbt-databricks>=1.0.0,<2.0.0'

Per ottenere l’ID di un warehouse SQL, aprire la pagina delle impostazioni del warehouse SQL, quindi copiare l'ID tra parentesi dopo il nome del warehouse nel campo Nome della scheda Panoramica.

Per altri mapping che è possibile impostare per questa attività, vedere tasks > dbt_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Consultare attività dbt per i lavori.

I bundle di asset di Databricks includono anche un modello di progetto dbt-sql che definisce un processo con un'attività dbt, nonché i profili dbt per i processi dbt distribuiti. Per informazioni sui modelli di aggregazioni di asset di Databricks, vedere Modelli di bundle predefiniti.

Eseguire l'attività del processo

Usi questa attività per eseguire un altro lavoro.

L'esempio seguente contiene un'attività di esecuzione nel secondo lavoro che esegue il primo lavoro.

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: '13.3.x-scala2.12'
            node_type_id: 'i3.xlarge'
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

In questo esempio viene utilizzata una sostituzione per recuperare l'ID del processo da eseguire. Per ottenere l'ID di un processo dall'interfaccia utente, aprire il processo nell'area di lavoro e copiare l'ID dal valore ID processo nella scheda Dettagli processo della pagina delle impostazioni dei processi.

Per altri mapping che è possibile impostare per questa attività, vedere tasks > run_job_task nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.