Een Databricks-notebook uitvoeren vanuit een ander notebook
Belangrijk
Gebruik Databricks-taken voor indeling van notitieblokken. Gebruik werkruimtebestanden voor code modularisatiescenario's. Gebruik alleen de technieken die in dit artikel worden beschreven wanneer uw use-case niet kan worden geïmplementeerd met behulp van een Databricks-taak, zoals voor het herhalen van notebooks via een dynamische set parameters of als u geen toegang hebt tot werkruimtebestanden. Zie Werkstromen plannen en organiseren en code delen voor meer informatie.
Vergelijking van %run
en dbutils.notebook.run()
Met de %run
opdracht kunt u een ander notitieblok opnemen in een notitieblok. U kunt bijvoorbeeld uw %run
code modulariseren door ondersteunende functies in een afzonderlijk notebook te plaatsen. U kunt deze ook gebruiken om notebooks samen te voegen die de stappen in een analyse implementeren. Wanneer u het notitieblok gebruikt %run
, wordt het aangeroepen notebook onmiddellijk uitgevoerd en worden de functies en variabelen die erin zijn gedefinieerd, beschikbaar in het aanroepende notebook.
De dbutils.notebook
API is een aanvulling op %run
omdat u hiermee parameters kunt doorgeven aan en waarden kunt retourneren uit een notebook. Hiermee kunt u complexe werkstromen en pijplijnen bouwen met afhankelijkheden. U kunt bijvoorbeeld een lijst met bestanden in een map ophalen en de namen doorgeven aan een ander notitieblok, wat niet mogelijk is met %run
. U kunt ook if-then-else-werkstromen maken op basis van retourwaarden of andere notebooks aanroepen met behulp van relatieve paden.
In tegenstelling tot %run
, wordt met de dbutils.notebook.run()
methode een nieuwe taak gestart om het notebook uit te voeren.
Deze methoden, zoals alle dbutils
API's, zijn alleen beschikbaar in Python en Scala. U kunt echter ook dbutils.notebook.run()
een R-notebook aanroepen.
Gebruiken %run
om een notebook te importeren
In dit voorbeeld definieert het eerste notebook een functie, reverse
die beschikbaar is in het tweede notebook nadat u de %run
magie hebt gebruikt om uit te voeren shared-code-notebook
.
Omdat beide notebooks zich in dezelfde map in de werkruimte bevinden, gebruikt u het voorvoegsel ./
om ./shared-code-notebook
aan te geven dat het pad moet worden omgezet ten opzichte van het huidige actieve notebook. U kunt notitieblokken ordenen in mappen, zoals %run ./dir/notebook
, of een absoluut pad gebruiken, zoals %run /Users/username@organization.com/directory/notebook
.
Notitie
%run
moet op zichzelf in een cel staan, omdat het hele notebook inline wordt uitgevoerd.- U kunt een Python-bestand en
import
de entiteiten die in dat bestand zijn gedefinieerd, niet uitvoeren%run
in een notebook. Als u wilt importeren uit een Python-bestand, raadpleegt u Uw code modulariseren met behulp van bestanden. U kunt het bestand ook inpakken in een Python-bibliotheek, een Azure Databricks-bibliotheek maken vanuit die Python-bibliotheek en de bibliotheek installeren in het cluster dat u gebruikt om uw notebook uit te voeren. - Wanneer u
%run
een notebook met widgets uitvoert, wordt het opgegeven notitieblok standaard uitgevoerd met de standaardwaarden van de widget. U kunt ook waarden doorgeven aan widgets; zie Databricks-widgets gebruiken met %run.
dbutils.notebook
API
De methoden die beschikbaar zijn in de dbutils.notebook
API zijn run
en exit
. Zowel parameters als retourwaarden moeten tekenreeksen zijn.
run(path: String, timeout_seconds: int, arguments: Map): String
Voer een notebook uit en retourneer de afsluitwaarde. De methode start een tijdelijke taak die onmiddellijk wordt uitgevoerd.
De timeout_seconds
parameter bepaalt de time-out van de uitvoering (0 betekent geen time-out): de aanroep om een uitzondering te run
genereren als deze niet binnen de opgegeven tijd wordt voltooid. Als Azure Databricks langer dan 10 minuten uitvalt, mislukt de uitvoering van timeout_seconds
het notebook, ongeacht.
Met arguments
de parameter worden widgetwaarden van het doelnotitieblok ingesteld. Als het notebook dat u uitvoert een widget heeft met de naam A
en u een sleutel-waardepaar ("A": "B")
doorgeeft als onderdeel van de parameter argumenten aan de run()
aanroep, wordt het ophalen van de waarde van de widget A
geretourneerd "B"
. U vindt de instructies voor het maken en werken met widgets in het artikel databricks-widgets .
Notitie
- De
arguments
parameter accepteert alleen Latijnse tekens (ASCII-tekenset). Als u niet-ASCII-tekens gebruikt, wordt een fout geretourneerd. - Taken die zijn gemaakt met behulp van de
dbutils.notebook
API, moeten binnen 30 dagen of minder worden voltooid.
run
Gebruik
Python
dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})
Scala
dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))
run
Voorbeeld
Stel dat u een notitieblok hebt met de naam workflows
foo
van een widget waarmee de waarde van de widget wordt afgedrukt:
dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print(dbutils.widgets.get("foo"))
Het uitvoeren dbutils.notebook.run("workflows", 60, {"foo": "bar"})
produceert het volgende resultaat:
De widget had de waarde die u hebt doorgegeven met behulp van dbutils.notebook.run()
, "bar"
in plaats van de standaardwaarde.
exit(value: String): void
Sluit een notitieblok af met een waarde. Als u een notebook aanroept met behulp van de run
methode, is dit de geretourneerde waarde.
dbutils.notebook.exit("returnValue")
Als u een taak aanroept dbutils.notebook.exit
, wordt het notitieblok voltooid. Als u wilt dat de taak mislukt, genereert u een uitzondering.
Voorbeeld
In het volgende voorbeeld geeft u argumenten door aan DataImportNotebook
en voert u verschillende notebooks (DataCleaningNotebook
of ErrorHandlingNotebook
) uit op basis van het resultaat van DataImportNotebook
.
Wanneer de code wordt uitgevoerd, wordt er een tabel weergegeven met een koppeling naar het actieve notebook:
Als u de uitvoeringsdetails wilt weergeven, klikt u op de koppeling Begintijd in de tabel. Als de uitvoering is voltooid, kunt u ook de details van de uitvoering bekijken door op de koppeling Eindtijd te klikken.
Geef gestructureerde gegevens door
In deze sectie ziet u hoe u gestructureerde gegevens tussen notebooks doorgeeft.
Python
# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.
## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))
# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
"status": "OK",
"table": "my_data"
}))
## In caller notebook
import json
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))
Scala
// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.
/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))
// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
/** In callee notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))
/** In caller notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))
Afhandeling van fouten
In deze sectie ziet u hoe u fouten kunt afhandelen.
Python
# Errors throw a WorkflowException.
def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
num_retries = 0
while True:
try:
return dbutils.notebook.run(notebook, timeout, args)
except Exception as e:
if num_retries > max_retries:
raise e
else:
print("Retrying error", e)
num_retries += 1
run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)
Scala
// Errors throw a WorkflowException.
import com.databricks.WorkflowException
// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
var numTries = 0
while (true) {
try {
return dbutils.notebook.run(notebook, timeout, args)
} catch {
case e: WorkflowException if numTries < maxTries =>
println("Error, retrying: " + e)
}
numTries += 1
}
"" // not reached
}
runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)
Gelijktijdig meerdere notebooks uitvoeren
U kunt meerdere notebooks tegelijk uitvoeren met behulp van standaard-Scala- en Python-constructies zoals Threads (Scala, Python) en Futures (Scala, Python). In de voorbeeldnotebooks wordt gedemonstreerd hoe u deze constructies gebruikt.
- Download de volgende 4 notebooks. De notebooks worden geschreven in Scala.
- Importeer de notebooks in één map in de werkruimte.
- Voer het notebook Gelijktijdig uitvoeren uit.