從 Git 資料夾或工作區檔案匯入 Python 模組
您可以將 Python 程式代碼儲存在 Databricks Git 資料夾中, 或 工作區檔案,然後將該 Python 程式代碼匯入 DLT 管線。 如需在 Git 資料夾或工作區檔案中使用模組的詳細資訊,請參閱 使用 Python 和 R 模組。
注意
您無法從儲存在 Databricks Git 資料夾或工作區檔案中的筆記本匯入原始程式碼。 相反地,當您建立或編輯管線時,請直接新增筆記本。 請參閱 設定 DLT 管線。
將 Python 模組匯入 DLT 管線
下列範例示範從工作區檔案將數據集查詢匯入為 Python 模組。 雖然此範例描述如何使用工作區檔案來儲存管線原始程式碼,但您可以將它與儲存在 Git 資料夾中的原始程式碼搭配使用。
若要執行此範例,請使用下列步驟:
在 Azure Databricks 工作區的側邊欄中,點擊
工作區,以打開工作區瀏覽器。
使用工作區瀏覽器來選取 Python 模組的目錄。
點擊選取目錄最右邊資料行中的
,然後點擊 建立 > 檔案。
輸入檔案名稱,例如,
clickstream_raw_module.py
。 檔案編輯器隨即開啟。 若要建立模組以將源數據讀入資料表,請在編輯器視窗中輸入下列內容:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
若要建立包含備妥數據的新數據表的模組,請在相同的目錄中建立新的檔案,輸入檔案的名稱,例如,
clickstream_prepared_module.py
,然後在新的編輯器視窗中輸入下列內容:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
接下來,建立工作流程筆記本。 移至您的 Azure Databricks 登陸頁面,然後選取 [建立筆記本],或按下提要字段中
[新增],然後選取 [Notebook]。 您也可以單擊 [
,然後按兩下 [建立 > Notebook],在工作區瀏覽器中建立筆記本。
為您的筆記本命名,並確認 Python 是預設語言。
按一下建立。
在筆記本中輸入範例程序代碼。
注意
如果您的筆記型電腦從與筆記本目錄不同的工作區檔案路徑或 Git 資料夾路徑匯入模組或套件,則您必須使用
sys.path.append()
手動將路徑附加至檔案。如果您要從 Git 資料夾匯入檔案,則必須在路徑前面加上
/Workspace/
。 例如,sys.path.append('/Workspace/...')
。 當路徑中省略/Workspace/
時,會導致錯誤。如果模組或套件儲存在與筆記本相同的目錄中,您就不需要手動附加路徑。 從 Git 資料夾的根目錄匯入時,您也不需要手動附加路徑,因為根目錄會自動附加至路徑。
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("catalog_name.schema_name.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
將
<module-path>
取代為包含要匯入之 Python 模組之目錄的路徑。使用新的筆記本建立管線。
若要執行資料管道,請在 管道詳細資料 頁面中,按一下 開始。
您也可以將 Python 程式代碼匯入為套件。 DLT Notebook 中的下列代碼段會從與筆記本位於相同目錄內的 dlt_packages
目錄匯入 test_utils
套件。
dlt_packages
目錄包含檔案 test_utils.py
和 __init__.py
,test_utils.py
定義函式 create_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)