從另一個 Notebook 執行 Databricks Notebook
重要
針對 Notebook 協調流程,請使用 Databricks 作業。 針對 Code 模組化案例,請使用工作區檔案。 只有當無法使用 Databricks 作業實作使用案例時,才應該使用本文所述的技術,例如,在動態
%run
與 dbutils.notebook.run()
的比較
%run
命令可讓您在 Notebook 中包含另一個 Notebook。 您可以使用 %run
將支援函數放在個別的 Notebook 中,以將 Code 模組化。 您也可以使用於在分析中串連實作步驟的 Notebook。 當您使用 %run
時,會立即執行呼叫的 Notebook,並在其中定義的函數與變數會在呼叫 Notebook 時可供使用。
dbutils.notebook
API 是 %run
的補充,因為它可讓您將 parameters 傳遞到筆記本並從中返回 values。 這可讓您組建具有相依性的複雜工作流程和管線。 例如,您可以 get 目錄中的檔案 list,並將名稱傳遞至另一個筆記本,%run
無法使用。 您也可以根據傳回 values 建立 if-then-else 工作流程,或使用相對路徑呼叫其他筆記本。
不同於 %run
,方法 dbutils.notebook.run()
會啟動新的作業來執行 Notebook。
就像所有 dbutils
API 一樣,這些方法只能用於 Python 和 Scala。 不過,您可以使用 dbutils.notebook.run()
來叫用 R Notebook。
使用 %run
匯入 Notebook
在此範例中,第一個 Notebook 會定義函數 reverse
,在您使用 %run
magic 執行 shared-code-notebook
之後,其可用於第二個 Notebook。
由於這兩個Notebook都位於工作區中的相同目錄中,因此請使用 ./
中的前置詞 ./shared-code-notebook
標記路徑應該相對於目前執行中的 Notebook 解析。 您可以將 Notebook 組織成目錄,例如 %run ./dir/notebook
,或使用像 %run /Users/username@organization.com/directory/notebook
之類的絕對路徑。
注意
-
%run
必須單獨在儲存格中,因為它會執行整個 Notebook 內嵌。 - 您無法使用
%run
將 Python 檔案,與import
該檔案中定義的實體執行到 Notebook。 若要從 Python 檔案匯入,請參閱使用檔案將程式碼模組化。 或者,將檔案封裝到 Python 程式庫、從該 Python 程式庫建立 Azure Databricks 程式庫,並將程式庫安裝到您用來執行 Notebook 的叢集中。 - 當您使用
%run
來執行包含小工具的筆記本時,根據預設,指定的筆記本會以小工具的預設 values執行。 您也可以將 values 傳遞至小工具;請參閱 使用 Databricks 小工具結合 %run。
dbutils.notebook
API
dbutils.notebook
API 所提供的方法是 run
與 exit
。
parameters 和傳回 values 必須是字串。
run(path: String, timeout_seconds: int, arguments: Map): String
執行 Notebook 並傳回其結束值。 該方法會啟動立即執行的暫時作業。
timeout_seconds
參數會控制執行的逾時 (0 表示沒有逾時):如果在指定時間內未完成,則對 run
的呼叫會擲回例外狀況。 如果 Azure Databricks 已關閉超過 10 分鐘,則不論 timeout_seconds
為何,Notebook 執行都會失敗。
arguments
參數會設定目標筆記本中的小工具 values。 具體來說,如果您正在執行的 Notebook 有名稱為 A
的小工具,而且您會將索引鍵/值組 ("A": "B")
當做引數參數的一部分傳遞給 run()
呼叫,則擷取小工具 A
的值會傳回 "B"
。 您可以在 Databricks 小工具一文中找到建立和使用小工具的指示。
注意
-
arguments
參數只接受拉丁字元(ASCII 字元 set)。 使用非 ASCII 字元會傳回錯誤。 - 使用
dbutils.notebook
API 建立的工作必須在 30 天內完成。
run
使用方式
Python
dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})
Scala
dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))
run
範例
假設您有名稱為 workflows
的 Notebook,其中包含會列印小工具值且名稱為 foo
的小工具:
dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print(dbutils.widgets.get("foo"))
執行 dbutils.notebook.run("workflows", 60, {"foo": "bar"})
會產生下列結果:
小工具有您使用 dbutils.notebook.run()
、"bar"
傳遞的值,而不是預設值。
exit(value: String): void
以一個值退出 Notebook。 如果您使用 run
方法呼叫 Notebook,這是傳回的值。
dbutils.notebook.exit("returnValue")
在作業中呼叫 dbutils.notebook.exit
會導致 Notebook 順利完成。 如果您想要讓作業失敗,請擲回例外狀況。
範例
在下列範例中,您會根據來自 DataImportNotebook
的結果,將引數傳遞給 DataCleaningNotebook
並執行不同的Notebook (ErrorHandlingNotebook
或 DataImportNotebook
)。
當程式代碼執行時,table 會出現,其中包含執行中筆記本的連結:
若要檢視執行詳細數據,請按兩下 table中 開始時間 連結。 如果執行完成,您也可以按兩下[結束時間]連結,檢視詳細的執行資料。
傳遞結構化資料
本章節說明如何在 Notebook 之間傳遞結構化資料。
Python
# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.
## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))
# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
"status": "OK",
"table": "my_data"
}))
## In caller notebook
import json
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))
Scala
// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.
/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))
// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
/** In callee notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))
/** In caller notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))
處理錯誤
本章節說明如何處理錯誤。
Python
# Errors throw a WorkflowException.
def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
num_retries = 0
while True:
try:
return dbutils.notebook.run(notebook, timeout, args)
except Exception as e:
if num_retries > max_retries:
raise e
else:
print("Retrying error", e)
num_retries += 1
run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)
Scala
// Errors throw a WorkflowException.
import com.databricks.WorkflowException
// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
var numTries = 0
while (true) {
try {
return dbutils.notebook.run(notebook, timeout, args)
} catch {
case e: WorkflowException if numTries < maxTries =>
println("Error, retrying: " + e)
}
numTries += 1
}
"" // not reached
}
runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)
同時執行多個 Notebook
您可以使用標準 Scala 和 Python 建構,例如執行緒 (Scala、Python) 和 Futures (Scala、Python),同時執行多個 Notebook。 範例 Notebook 會示範如何使用這些建構。
- 下載下列 4 個 Notebook。 Notebook 是以 Scala 撰寫。
- 將 Notebook 匯入工作區中的單一資料夾。
- 執行同時執行 Notebook。