Uruchamianie notesu usługi Databricks z innego notesu
Ważne
W przypadku orkiestracji notesu użyj zadań usługi Databricks. W przypadku scenariuszy modularyzacji kodu użyj plików obszaru roboczego. Należy używać technik opisanych w tym artykule tylko wtedy, gdy nie można zaimplementować przypadku użycia przy użyciu zadania usługi Databricks, takiego jak w przypadku pętli notesów w dynamicznym zestawie parametrów lub jeśli nie masz dostępu do plików obszaru roboczego. Aby uzyskać więcej informacji, zobacz Planowanie i organizowanie przepływów pracy oraz udostępnianie kodu.
Porównanie i %run
dbutils.notebook.run()
Polecenie %run
umożliwia dołączenie innego notesu do notesu. Możesz użyć %run
polecenia , aby modularyzować kod, na przykład umieszczając funkcje pomocnicze w osobnym notesie. Można go również użyć do łączenia notesów, które implementują kroki w analizie. Gdy używasz %run
metody , wywoływany notes jest natychmiast wykonywany, a funkcje i zmienne zdefiniowane w nim stają się dostępne w notesie wywołującym.
Interfejs dbutils.notebook
API jest uzupełnieniem %run
, ponieważ umożliwia przekazywanie parametrów do i zwracanie wartości z notesu. Dzięki temu można tworzyć złożone przepływy pracy i potoki z zależnościami. Możesz na przykład pobrać listę plików w katalogu i przekazać nazwy do innego notesu, co nie jest możliwe za pomocą polecenia %run
. Możesz również utworzyć przepływy pracy if-then-else na podstawie wartości zwracanych lub wywołać inne notesy przy użyciu ścieżek względnych.
dbutils.notebook.run()
W przeciwieństwie do %run
metody metoda uruchamia nowe zadanie uruchamiania notesu.
Te metody, podobnie jak wszystkie dbutils
interfejsy API, są dostępne tylko w językach Python i Scala. Można jednak użyć dbutils.notebook.run()
polecenia , aby wywołać notes języka R.
Użyj %run
polecenia , aby zaimportować notes
W tym przykładzie pierwszy notes definiuje funkcję , reverse
która jest dostępna w drugim notesie po użyciu %run
funkcji magic do wykonania shared-code-notebook
.
Ponieważ oba te notesy znajdują się w tym samym katalogu w obszarze roboczym, użyj prefiksu ./
w pliku , ./shared-code-notebook
aby wskazać, że ścieżka powinna zostać rozpoznana względem aktualnie uruchomionego notesu. Notesy można organizować w katalogach, takich jak , lub używać ścieżki bezwzględnej, takiej jak %run ./dir/notebook
%run /Users/username@organization.com/directory/notebook
.
Uwaga
%run
musi znajdować się w komórce, ponieważ uruchamia cały notes w tekście.- Nie można użyć
%run
polecenia , aby uruchomić plik języka Python iimport
jednostki zdefiniowane w tym pliku w notesie. Aby zaimportować z pliku w języku Python, zobacz Modularyzowanie kodu przy użyciu plików. Możesz też spakować plik do biblioteki języka Python, utworzyć bibliotekę usługi Azure Databricks z tej biblioteki języka Python i zainstalować bibliotekę w klastrze używanym do uruchamiania notesu. - Gdy używasz
%run
do uruchamiania notesu zawierającego widżety, domyślnie określony notes jest uruchamiany z wartościami domyślnymi widżetu. Możesz również przekazać wartości do widżetów; zobacz Używanie widżetów usługi Databricks z %run.
dbutils.notebook
API
Metody dostępne w interfejsie dbutils.notebook
API to run
i exit
. Zarówno parametry, jak i wartości zwracane muszą być ciągami.
run(path: String, timeout_seconds: int, arguments: Map): String
Uruchom notes i zwróć jego wartość zakończenia. Metoda uruchamia efemeryczne zadanie, które jest uruchamiane natychmiast.
timeout_seconds
Parametr steruje limitem czasu przebiegu (0 oznacza brak limitu czasu): wywołanie run
zgłasza wyjątek, jeśli nie zostanie zakończone w określonym czasie. Jeśli usługa Azure Databricks nie działa przez ponad 10 minut, uruchomienie notesu kończy się niepowodzeniem niezależnie od timeout_seconds
.
Parametr arguments
ustawia wartości widżetu notesu docelowego. W szczególności jeśli uruchomiony notes ma widżet o nazwie A
i przekazujesz parę ("A": "B")
klucz-wartość jako część parametru run()
argumentów do wywołania, pobieranie wartości widżetu A
zwróci wartość "B"
. Instrukcje dotyczące tworzenia widżetów i pracy z nimi można znaleźć w artykule Dotyczącym widżetów usługi Databricks.
Uwaga
- Parametr
arguments
akceptuje tylko znaki łacińskie (zestaw znaków ASCII). Użycie znaków innych niż ASCII zwraca błąd. - Zadania utworzone przy użyciu interfejsu
dbutils.notebook
API muszą zostać ukończone w ciągu 30 dni lub mniej.
run
Zwyczaj
Python
dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})
Scala
dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))
run
Przykład
Załóżmy, że masz notes o nazwie workflows
z widżetem o nazwie foo
, który drukuje wartość widżetu:
dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print(dbutils.widgets.get("foo"))
Uruchomienie dbutils.notebook.run("workflows", 60, {"foo": "bar"})
powoduje następujący wynik:
Widżet miał przekazaną wartość przy użyciu elementu dbutils.notebook.run()
, "bar"
a nie wartości domyślnej.
exit(value: String): void
Zamknij notes z wartością. Jeśli wywołasz notes przy użyciu run
metody , jest to zwracana wartość.
dbutils.notebook.exit("returnValue")
Wywołanie dbutils.notebook.exit
w zadaniu powoduje pomyślne ukończenie notesu. Jeśli chcesz spowodować niepowodzenie zadania, wyrzuć wyjątek.
Przykład
W poniższym przykładzie przekazujesz argumenty do DataImportNotebook
i uruchamiasz różne notesy (DataCleaningNotebook
lub ErrorHandlingNotebook
) na podstawie wyniku z .DataImportNotebook
Po uruchomieniu kodu zostanie wyświetlona tabela zawierająca link do uruchomionego notesu:
Aby wyświetlić szczegóły przebiegu, kliknij link Godzina rozpoczęcia w tabeli. Jeśli przebieg zostanie ukończony, możesz również wyświetlić szczegóły przebiegu, klikając link Czas zakończenia.
Przekazywanie danych strukturalnych
W tej sekcji pokazano, jak przekazywać dane ustrukturyzowane między notesami.
Python
# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.
## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))
# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
"status": "OK",
"table": "my_data"
}))
## In caller notebook
import json
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))
Scala
// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.
/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))
// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
/** In callee notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))
/** In caller notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))
Obsługa błędów
W tej sekcji pokazano, jak obsługiwać błędy.
Python
# Errors throw a WorkflowException.
def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
num_retries = 0
while True:
try:
return dbutils.notebook.run(notebook, timeout, args)
except Exception as e:
if num_retries > max_retries:
raise e
else:
print("Retrying error", e)
num_retries += 1
run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)
Scala
// Errors throw a WorkflowException.
import com.databricks.WorkflowException
// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
var numTries = 0
while (true) {
try {
return dbutils.notebook.run(notebook, timeout, args)
} catch {
case e: WorkflowException if numTries < maxTries =>
println("Error, retrying: " + e)
}
numTries += 1
}
"" // not reached
}
runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)
Współbieżne uruchamianie wielu notesów
Jednocześnie można uruchamiać wiele notesów przy użyciu standardowych konstrukcji Języka Scala i Python, takich jak Threads (Scala, Python) i Futures (Scala, Python). W przykładowych notesach pokazano, jak używać tych konstrukcji.
- Pobierz następujące 4 notesy. Notesy są napisane w języku Scala.
- Zaimportuj notesy do jednego folderu w obszarze roboczym.
- Uruchom notes Uruchom współbieżnie.