Condividi tramite


Eseguire un notebook di Databricks da un altro notebook

Importante

Per l'orchestrazione dei notebook, usare Databricks Jobs. Per gli scenari di modularizzazione del codice, usare i file di area di lavoro. È consigliabile usare le tecniche descritte in questo articolo solo quando il caso d'uso non può essere implementato usando un processo di Databricks, ad esempio per il ciclo dei notebook su un set dinamico di parametri o se non si ha accesso ai file di area di lavoro. Per altre informazioni, vedere Pianificare e orchestrare i flussi di lavoro e il codice condiviso.

Confronto tra %run e dbutils.notebook.run()

Il comando %run consente di includere un altro notebook all'interno di un notebook. È possibile usare %run per modularizzare il codice, ad esempio inserendo funzioni di supporto in un notebook separato. È anche possibile usarlo per concatenare i notebook che implementano i passaggi in un'analisi. Quando si usa %run, il notebook richiamato viene eseguito immediatamente e le funzioni e le variabili definite in esso diventano disponibili nel notebook chiamante.

L'API dbutils.notebook è un complemento a %run perché consente di passare parametri a e restituire valori da un notebook. In questo modo è possibile creare flussi di lavoro e pipeline complessi con dipendenze. Ad esempio, è possibile ottenere un elenco di file in una directory e passare i nomi a un altro notebook, operazione non è possibile con %run. È anche possibile creare flussi di lavoro if-then-else basati su valori restituiti o chiamare altri notebook usando percorsi relativi.

A differenza di %run, il metodo dbutils.notebook.run() avvia un nuovo processo per eseguire il notebook.

Questi metodi, come tutte le API dbutils, sono disponibili solo in Python e Scala. Tuttavia, è possibile usare dbutils.notebook.run() per richiamare un notebook R.

Usare %run per importare un notebook

In questo esempio il primo notebook definisce una funzione, reverse, disponibile nel secondo notebook dopo aver usato il comando magic %run per eseguire shared-code-notebook.

Notebook di codice condiviso

Esempio di importazione del notebook

Poiché entrambi questi notebook si trovano nella stessa directory nell'area di lavoro, usare il prefisso ./ in ./shared-code-notebook per indicare che il percorso deve essere risolto rispetto al notebook attualmente in esecuzione. È possibile organizzare i notebook in directory, ad esempio %run ./dir/notebook, o usare un percorso assoluto, ad esempio %run /Users/username@organization.com/directory/notebook.

Nota

  • Il comando %run deve trovarsi in una cella da solo, perché esegue l'intero notebook inline.
  • Non è possibile usare %run per eseguire un file Python e import le entità definite in tale file in un notebook. Per importare da un file Python, vedere Modularizzare il codice usando i file. In alternativa, creare un pacchetto del file in una libreria Python, creare una libreria di Azure Databricks da tale libreria Python e installare la libreria nel cluster usato per eseguire il notebook.
  • Quando si usa %run per eseguire un notebook che contiene widget, per impostazione predefinita il notebook specificato viene eseguito con i valori predefiniti del widget. È anche possibile passare valori ai widget; vedere Usare i widget di Databricks con %run.

API dbutils.notebook

I metodi disponibili nell'API dbutils.notebook sono run e exit. Sia i parametri che i valori di ritorno devono essere stringhe.

run(path: String, timeout_seconds: int, arguments: Map): String

Eseguire un notebook e restituirne il valore di uscita. Il metodo avvia un processo temporaneo che viene eseguito immediatamente.

Il parametro timeout_seconds controlla il timeout dell'esecuzione (0 indica che non si verifica alcun timeout): la chiamata a run genera un'eccezione se non termina entro l'ora specificata. Se Azure Databricks è inattivo per più di 10 minuti, l'esecuzione del notebook ha esito negativo indipendentemente da timeout_seconds.

Il parametro arguments imposta i valori del widget del notebook di destinazione. In particolare, se il notebook in esecuzione ha un widget chiamato A, e si passa una coppia chiave-valore ("A": "B") come parte del parametro argomenti della chiamata run(), il recupero del valore del widget A restituirà "B". Le istruzioni per la creazione e l'uso dei widget sono disponibili nell'articolo Widget di Databricks.

Nota

  • Il parametro arguments accetta solo caratteri latini (set di caratteri ASCII). L'utilizzo di caratteri non ASCII restituisce un errore.
  • I processi creati con l'API dbutils.notebook devono essere completati entro 30 giorni o meno.

Uso di run

Python

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

Scala

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

Esempio di run

Si supponga di avere un notebook denominato workflows con un widget denominato foo che stampa il valore del widget:

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print(dbutils.widgets.get("foo"))

L'esecuzione dbutils.notebook.run("workflows", 60, {"foo": "bar"}) produce il seguente risultato:

Notebook del widget

Il widget ha il valore passato usando dbutils.notebook.run(), "bar", anziché il valore predefinito.

exit(value: String): void chiude un notebook con un valore. Se si richiama un notebook usando il metodo run, questo è il valore restituito.

dbutils.notebook.exit("returnValue")

La chiamata dbutils.notebook.exit in un processo determina il completamento corretto del notebook. Se si vuole causare un errore del processo, generare un'eccezione.

Esempio di

Nell'esempio seguente si passano argomenti a DataImportNotebook e si eseguono notebook diversi (DataCleaningNotebook o ErrorHandlingNotebook) in base al risultato di DataImportNotebook.

Esempio di if-else

Quando viene eseguito il codice, viene visualizzata una tabella contenente un collegamento al notebook in esecuzione:

Collegamento al notebook in esecuzione

Per visualizzare i dettagli dell'esecuzione, cliccare il collegamento Ora di inizio nella tabella. Se l'esecuzione è stata completata, è anche possibile visualizzare i dettagli dell'esecuzione cliccando il collegamento Ora di fine.

Risultato dell'esecuzione temporanea del notebook

Passare dati strutturati

Questa sezione illustra come passare dati strutturati tra notebook.

Python

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
import json

result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))

Scala

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

Gestione degli errori

Questa sezione spiega come gestire gli errori.

Python

# Errors throw a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

Scala

// Errors throw a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

Eseguire più notebook simultaneamente

È possibile eseguire più notebook contemporaneamente usando costrutti Scala e Python standard, ad esempio (Scala, Python) e Futures (Scala, Python). I notebook di esempio illustrano come usare questi costrutti.

  1. Scaricare i 4 notebook seguenti. I notebook sono scritti in Scala.
  2. Importare i notebook in una singola cartella nell'area di lavoro.
  3. Eseguire il notebook Esegui contemporaneamente.

Eseguire contemporaneamente il notebook

Ottenere il notebook

Eseguire in parallelo notebook

Ottenere il notebook

Notebook di test

Ottenere il notebook

Notebook testing-2

Ottenere il notebook