Aggiungi attività ai job nei pacchetti di risorse di Databricks
Questo articolo fornisce esempi di vari tipi di attività che è possibile aggiungere ai job di Azure Databricks in Databricks Asset Bundles. Consultare Che cosa sono i Databricks Asset Bundle?.
La maggior parte dei tipi di attività di processo include parametri specifici dell'attività tra le impostazioni supportate, ma è anche possibile definire i parametri del processo che vengono passati alle attività. I riferimenti a valori dinamici sono supportati per i parametri del processo, che consentono di passare valori specifici per l'esecuzione del processo tra le attività. Vedere Che cos'è un valore dinamico di riferimento?.
Nota
È possibile eseguire l'override delle impostazioni delle attività di lavoro. Vedi Impostazioni di override delle attività di lavoro nei pacchetti di risorse di Databricks.
Suggerimento
Per generare rapidamente la configurazione delle risorse per un processo esistente usando l'interfaccia della riga di comando di Databricks, è possibile usare il comando bundle generate job
. Vedere i comandi del bundle.
Attività del notebook
Usi questo compito per eseguire un notebook.
Nell'esempio seguente viene aggiunta un'attività notebook a un processo e viene impostato un parametro di processo denominato my_job_run_id
. Il percorso del notebook da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. L'attività ottiene il notebook dalla posizione implementata nell'area di lavoro di Azure Databricks.
resources:
jobs:
my-notebook-job:
name: my-notebook-job
tasks:
- task_key: my-notebook-task
notebook_task:
notebook_path: ./my-notebook.ipynb
parameters:
- name: my_job_run_id
default: '{{job.run_id}}'
Per altri mapping che è possibile impostare per questa attività, vedere tasks > notebook_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedi Attività del notebook per le attività.
Attività di condizione If/else
Il condition_task
consente di aggiungere una fase con logica condizionale if/else al tuo lavoro. L'attività valuta una condizione che può essere usata per controllare l'esecuzione di altre attività. L'attività condizione non richiede un cluster per l'esecuzione e non supporta ritenti o notifiche. Per altre informazioni sull'attività if/else, vedere Aggiungere logica di diramazione a un processo con l'attività If/else.
L'esempio seguente contiene un'attività condizionale e un'attività di notebook, in cui l'attività di notebook viene eseguita solo se il numero di riparazioni del compito è inferiore a 5.
resources:
jobs:
my-job:
name: my-job
tasks:
- task_key: condition_task
condition_task:
op: LESS_THAN
left: '{{job.repair_count}}'
right: '5'
- task_key: notebook_task
depends_on:
- task_key: condition_task
outcome: 'true'
notebook_task:
notebook_path: ../src/notebook.ipynb
Per altri mapping che è possibile impostare per questa attività, vedere tasks > condition_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.
Per ogni attività
Il for_each_task
consente di aggiungere un'operazione con un ciclo 'for each' al tuo processo. L'attività esegue un'attività nidificata per ciascun input fornito. Per ulteriori informazioni sulla for_each_task
, consultare Utilizzare un'attività di For each
per eseguire un'altra attività in un ciclo.
Nell'esempio seguente viene aggiunto un for_each_task
a un'attività, iterando sui valori di un'altra attività e processandoli.
resources:
jobs:
my_job:
name: my_job
tasks:
- task_key: generate_countries_list
notebook_task:
notebook_path: ../src/generate_countries_list.ipnyb
- task_key: process_countries
depends_on:
- task_key: generate_countries_list
for_each_task:
inputs: '{{tasks.generate_countries_list.values.countries}}'
task:
task_key: process_countries_iteration
notebook_task:
notebook_path: ../src/process_countries_notebook.ipnyb
Per altri mapping che è possibile impostare per questa attività, vedere tasks > for_each_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.
Attività di script Python
Utilizzi questa attività per eseguire un file Python.
L'esempio seguente aggiunge un'attività di script Python a un lavoro. Il percorso del file Python da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. L'attività recupera il file Python dalla posizione in cui è stato distribuito nell'area di lavoro di Azure Databricks.
resources:
jobs:
my-python-script-job:
name: my-python-script-job
tasks:
- task_key: my-python-script-task
spark_python_task:
python_file: ./my-script.py
Per altri mapping che è possibile impostare per questa attività, vedere tasks > spark_python_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere anche Attività script Python per i processi.
Attività "Python wheel"
Usi questa attività per eseguire un file Python wheel.
L'esempio seguente aggiunge un task di tipo wheel per Python a un job. Il percorso del file Python wheel da distribuire è relativo al file di configurazione in cui è dichiarata questa attività. Consulta Dipendenze delle librerie Asset Bundles di Databricks.
resources:
jobs:
my-python-wheel-job:
name: my-python-wheel-job
tasks:
- task_key: my-python-wheel-task
python_wheel_task:
entry_point: run
package_name: my_package
libraries:
- whl: ./my_package/dist/my_package-*.whl
Per altri mapping che è possibile impostare per questa attività, vedere tasks > python_wheel_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Consultare anche Sviluppare un file Python wheel utilizzando i Databricks Asset Bundles e Attività Python Wheel per i lavori.
Attività JAR
Usate questa attività per eseguire un file JAR. È possibile fare riferimento a librerie JAR locali o in un'area di lavoro, in un volume del catalogo Unity o in un percorso di archiviazione cloud esterno. Consulta Dipendenze delle librerie Asset Bundles di Databricks.
L'esempio seguente aggiunge un file JAR a un processo. Il percorso del file JAR si trova nella posizione del volume specificata.
resources:
jobs:
my-jar-job:
name: my-jar-job
tasks:
- task_key: my-jar-task
spark_jar_task:
main_class_name: org.example.com.Main
libraries:
- jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar
Per altri mapping che è possibile impostare per questa attività, vedere tasks > spark_jar_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività JAR per i lavori.
Attività file SQL
Questa attività viene usata per eseguire un file SQL che si trova in un'area di lavoro o in un repository Git remoto.
L'esempio seguente aggiunge un file SQL a un processo. Questa attività file SQL usa il warehouse SQL specificato per eseguire il file SQL specificato.
resources:
jobs:
my-sql-file-job:
name: my-sql-file-job
tasks:
- task_key: my-sql-file-task
sql_task:
file:
path: /Users/someone@example.com/hello-world.sql
source: WORKSPACE
warehouse_id: 1a111111a1111aa1
Per ottenere l’ID di un warehouse SQL, aprire la pagina delle impostazioni del warehouse SQL, quindi copiare l'ID tra parentesi dopo il nome del warehouse nel campo Nome della scheda Panoramica.
Per altri mapping che è possibile impostare per questa attività, vedere tasks > sql_task > file
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere Attività SQL per i processi.
Attività della catena DLT
Usi questo compito per eseguire una pipeline DLT. Vedere Che cos'è DLT?.
Nell'esempio seguente viene aggiunta un'attività della pipeline DLT a un compito. Questa attività della pipeline DLT esegue la pipeline specificata.
resources:
jobs:
my-pipeline-job:
name: my-pipeline-job
tasks:
- task_key: my-pipeline-task
pipeline_task:
pipeline_id: 11111111-1111-1111-1111-111111111111
È possibile ottenere l'ID di una pipeline aprendo la pipeline nell'area di lavoro e copiando il valore dell'ID pipeline nella scheda Dettagli pipeline della pagina delle impostazioni della pipeline.
Per altri mapping che è possibile impostare per questa attività, vedere tasks > pipeline_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Vedere l'attività della pipeline DLT per i lavori.
attività di dbt
Questa attività viene usata per eseguire uno o più comandi dbt. Vedere Connettersi a dbt Cloud.
L'esempio seguente aggiunge un'attività dbt a un compito. Questa attività dbt usa il data warehouse SQL specificato per eseguire i comandi dbt specificati.
resources:
jobs:
my-dbt-job:
name: my-dbt-job
tasks:
- task_key: my-dbt-task
dbt_task:
commands:
- 'dbt deps'
- 'dbt seed'
- 'dbt run'
project_directory: /Users/someone@example.com/Testing
warehouse_id: 1a111111a1111aa1
libraries:
- pypi:
package: 'dbt-databricks>=1.0.0,<2.0.0'
Per ottenere l’ID di un warehouse SQL, aprire la pagina delle impostazioni del warehouse SQL, quindi copiare l'ID tra parentesi dopo il nome del warehouse nel campo Nome della scheda Panoramica.
Per altri mapping che è possibile impostare per questa attività, vedere tasks > dbt_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML. Consultare attività dbt per i lavori.
I bundle di asset di Databricks includono anche un modello di progetto dbt-sql
che definisce un processo con un'attività dbt, nonché i profili dbt per i processi dbt distribuiti. Per informazioni sui modelli di aggregazioni di asset di Databricks, vedere Modelli di bundle predefiniti.
Eseguire l'attività del processo
Usi questa attività per eseguire un altro lavoro.
L'esempio seguente contiene un'attività di esecuzione nel secondo lavoro che esegue il primo lavoro.
resources:
jobs:
my-first-job:
name: my-first-job
tasks:
- task_key: my-first-job-task
new_cluster:
spark_version: '13.3.x-scala2.12'
node_type_id: 'i3.xlarge'
num_workers: 2
notebook_task:
notebook_path: ./src/test.py
my_second_job:
name: my-second-job
tasks:
- task_key: my-second-job-task
run_job_task:
job_id: ${resources.jobs.my-first-job.id}
In questo esempio viene utilizzata una sostituzione per recuperare l'ID del processo da eseguire. Per ottenere l'ID di un processo dall'interfaccia utente, aprire il processo nell'area di lavoro e copiare l'ID dal valore ID processo nella scheda Dettagli processo della pagina delle impostazioni dei processi.
Per altri mapping che è possibile impostare per questa attività, vedere tasks > run_job_task
nel payload della richiesta dell'operazione di creazione del processo come definito in POST /api/2.1/jobs/create nel riferimento all'API REST, espresso in formato YAML.