共用方式為


從 Git 資料夾或工作區檔案匯入 Python 模組

您可以將 Python 程式代碼儲存在 Databricks Git 資料夾中,工作區檔案,然後將該 Python 程式代碼匯入 DLT 管線。 如需在 Git 資料夾或工作區檔案中使用模組的詳細資訊,請參閱 使用 Python 和 R 模組

注意

您無法從儲存在 Databricks Git 資料夾或工作區檔案中的筆記本匯入原始程式碼。 相反地,當您建立或編輯管線時,請直接新增筆記本。 請參閱 設定 DLT 管線

將 Python 模組匯入 DLT 管線

下列範例示範從工作區檔案將數據集查詢匯入為 Python 模組。 雖然此範例描述如何使用工作區檔案來儲存管線原始程式碼,但您可以將它與儲存在 Git 資料夾中的原始程式碼搭配使用。

若要執行此範例,請使用下列步驟:

  1. 在 Azure Databricks 工作區的側邊欄中,點擊 工作區圖示工作區,以打開工作區瀏覽器。

  2. 使用工作區瀏覽器來選取 Python 模組的目錄。

  3. 點擊選取目錄最右邊資料行中的 Kebab 選單,然後點擊 建立 > 檔案

  4. 輸入檔案名稱,例如,clickstream_raw_module.py。 檔案編輯器隨即開啟。 若要建立模組以將源數據讀入資料表,請在編輯器視窗中輸入下列內容:

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. 若要建立包含備妥數據的新數據表的模組,請在相同的目錄中建立新的檔案,輸入檔案的名稱,例如,clickstream_prepared_module.py,然後在新的編輯器視窗中輸入下列內容:

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. 接下來,建立工作流程筆記本。 移至您的 Azure Databricks 登陸頁面,然後選取 [建立筆記本],或按下提要字段中 的 [新增圖示][新增],然後選取 [Notebook]。 您也可以單擊 [Kebab] 功能表然後按兩下 [建立 > Notebook],在工作區瀏覽器中建立筆記本。

  7. 為您的筆記本命名,並確認 Python 是預設語言。

  8. 按一下建立

  9. 在筆記本中輸入範例程序代碼。

    注意

    如果您的筆記型電腦從與筆記本目錄不同的工作區檔案路徑或 Git 資料夾路徑匯入模組或套件,則您必須使用 sys.path.append()手動將路徑附加至檔案。

    如果您要從 Git 資料夾匯入檔案,則必須在路徑前面加上 /Workspace/。 例如,sys.path.append('/Workspace/...')。 當路徑中省略 /Workspace/ 時,會導致錯誤。

    如果模組或套件儲存在與筆記本相同的目錄中,您就不需要手動附加路徑。 從 Git 資料夾的根目錄匯入時,您也不需要手動附加路徑,因為根目錄會自動附加至路徑。

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        spark.read.table("catalog_name.schema_name.clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    <module-path> 取代為包含要匯入之 Python 模組之目錄的路徑。

  10. 使用新的筆記本建立管線。

  11. 若要執行資料管道,請在 管道詳細資料 頁面中,按一下 開始

您也可以將 Python 程式代碼匯入為套件。 DLT Notebook 中的下列代碼段會從與筆記本位於相同目錄內的 dlt_packages 目錄匯入 test_utils 套件。 dlt_packages 目錄包含檔案 test_utils.py__init__.pytest_utils.py 定義函式 create_test_table()

import dlt

@dlt.table
def my_table():
  return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)