從 Git 資料夾或工作區檔案匯入 Python 模組
您可以將 Python 程式代碼儲存在 Databricks Git 資料夾中, 或 工作區檔案中,然後將該 Python 程式代碼匯入至您的 Delta Live Tables 管線。 如需在 Git 資料夾或工作區檔案中使用模組的詳細資訊,請參閱 使用 Python 和 R 模組。
注意
您無法從儲存在 Databricks Git 資料夾或工作區檔案中的筆記本匯入原始程式碼。 相反地,當您建立或編輯管線時,請直接新增筆記本。 請參閱 設定 Delta Live Tables 資料管線。
將 Python 模組匯入至 Delta Live Tables 管線
下列範例示範從工作區檔案將數據集查詢匯入為 Python 模組。 雖然此範例描述如何使用工作區檔案來儲存管線原始程式碼,但您可以將它與儲存在 Git 資料夾中的原始程式碼搭配使用。
若要執行此範例,請使用下列步驟:
按兩下 Azure Databricks 工作區提要欄位中的 [工作區 ],以開啟工作區瀏覽器。
使用工作區瀏覽器來選取 Python 模組的目錄。
按一下選取目錄最右邊資料行中的 [,然後按一下 [建立 > 檔案]。
輸入檔案名稱, 例如
clickstream_raw_module.py
。 檔案編輯器隨即開啟。 若要建立模組以將源數據讀入資料表,請在編輯器視窗中輸入下列內容:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
若要建立包含備妥數據的新數據表的模組,請在相同的目錄中建立新的檔案,輸入檔案的名稱,例如,
clickstream_prepared_module.py
,然後在新的編輯器視窗中輸入下列內容:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
接下來,建立管線筆記本。 移至您的 Azure Databricks 首頁,然後選取 [建立筆記本],或按下側邊欄中的 [新增],然後選取 [筆記本]。 您也可以按下 並按兩下 [ 建立 > 筆記本],在工作區瀏覽器中建立筆記本。
為您的筆記本命名,並確認 Python 是預設語言。
按一下 [建立]。
在筆記本中輸入範例程序代碼。
注意
如果您的筆記本從工作區檔案路徑或 Git 資料夾路徑匯入模組或套件,則您必須使用
sys.path.append()
手動將路徑附加至檔案。如果您要從 Git 資料夾匯入檔案,則必須在路徑前面加上
/Workspace/
。 例如:sys.path.append('/Workspace/...')
。 省略/Workspace/
路徑會導致錯誤。如果模組或套件儲存在與筆記本相同的目錄中,您就不需要手動附加路徑。 從 Git 資料夾的根目錄匯入時,您也不需要手動附加路徑,因為根目錄會自動附加至路徑。
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("LIVE.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
將取代
<module-path>
為包含要匯入之 Python 模組之目錄的路徑。使用新的筆記本建立管線。
若要執行管線,請在 [管線詳細數據 ] 頁面中,按兩下 [ 開始]。
您也可以將 Python 程式代碼匯入為套件。 Delta Live Tables 筆記本中的下列代碼段會從與筆記本位於相同目錄內的 test_utils
目錄匯入 dlt_packages
套件。 目錄 dlt_packages
包含 檔案 test_utils.py
和 __init__.py
,並 test_utils.py
定義 函式 create_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)