共用方式為


從 Git 資料夾或工作區檔案匯入 Python 模組

您可以將 Python 程式代碼儲存在 Databricks Git 資料夾中,工作區檔案中,然後將該 Python 程式代碼匯入至您的 Delta Live Tables 管線。 如需在 Git 資料夾或工作區檔案中使用模組的詳細資訊,請參閱 使用 Python 和 R 模組

注意

您無法從儲存在 Databricks Git 資料夾或工作區檔案中的筆記本匯入原始程式碼。 相反地,當您建立或編輯管線時,請直接新增筆記本。 請參閱 設定 Delta Live Tables 資料管線

將 Python 模組匯入至 Delta Live Tables 管線

下列範例示範從工作區檔案將數據集查詢匯入為 Python 模組。 雖然此範例描述如何使用工作區檔案來儲存管線原始程式碼,但您可以將它與儲存在 Git 資料夾中的原始程式碼搭配使用。

若要執行此範例,請使用下列步驟:

  1. 按兩下 工作區圖示Azure Databricks 工作區提要欄位中的 [工作區 ],以開啟工作區瀏覽器。

  2. 使用工作區瀏覽器來選取 Python 模組的目錄。

  3. 按一下選取目錄最右邊資料行中的 [Kebab] 選單,然後按一下 [建立 > 檔案]

  4. 輸入檔案名稱, 例如 clickstream_raw_module.py。 檔案編輯器隨即開啟。 若要建立模組以將源數據讀入資料表,請在編輯器視窗中輸入下列內容:

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. 若要建立包含備妥數據的新數據表的模組,請在相同的目錄中建立新的檔案,輸入檔案的名稱,例如,clickstream_prepared_module.py,然後在新的編輯器視窗中輸入下列內容:

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. 接下來,建立管線筆記本。 移至您的 Azure Databricks 首頁,然後選取 [建立筆記本],或按下側邊欄中的 [新增圖示][新增],然後選取 [筆記本]。 您也可以按下 Kebab 功能表 並按兩下 [ 建立 > 筆記本],在工作區瀏覽器中建立筆記本

  7. 為您的筆記本命名,並確認 Python 是預設語言。

  8. 按一下 [建立]。

  9. 在筆記本中輸入範例程序代碼。

    注意

    如果您的筆記本從工作區檔案路徑或 Git 資料夾路徑匯入模組或套件,則您必須使用 sys.path.append()手動將路徑附加至檔案。

    如果您要從 Git 資料夾匯入檔案,則必須在路徑前面加上 /Workspace/ 。 例如: sys.path.append('/Workspace/...') 。 省略 /Workspace/ 路徑會導致錯誤。

    如果模組或套件儲存在與筆記本相同的目錄中,您就不需要手動附加路徑。 從 Git 資料夾的根目錄匯入時,您也不需要手動附加路徑,因為根目錄會自動附加至路徑。

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        spark.read.table("LIVE.clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    將取代 <module-path> 為包含要匯入之 Python 模組之目錄的路徑。

  10. 使用新的筆記本建立管線。

  11. 若要執行管線,請在 [管線詳細數據 ] 頁面中,按兩下 [ 開始]。

您也可以將 Python 程式代碼匯入為套件。 Delta Live Tables 筆記本中的下列代碼段會從與筆記本位於相同目錄內的 test_utils 目錄匯入 dlt_packages 套件。 目錄 dlt_packages 包含 檔案 test_utils.py__init__.py,並 test_utils.py 定義 函式 create_test_table()

import dlt

@dlt.table
def my_table():
  return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)