แชร์ผ่าน


Import Python modules from Git folders or workspace files

You can store Python code in Databricks Git folders or in workspace files and then import that Python code into your Delta Live Tables pipelines. For more information about working with modules in Git folders or workspace files, see Work with Python and R modules.

Note

You cannot import source code from a notebook stored in a Databricks Git folder or a workspace file. Instead, add the notebook directly when you create or edit a pipeline. See Configure a Delta Live Tables pipeline.

Import a Python module to a Delta Live Tables pipeline

The following example demonstrates importing dataset queries as Python modules from workspace files. Although this example describes using workspace files to store the pipeline source code, you can use it with source code stored in a Git folder.

To run this example, use the following steps:

  1. Click Workspaces Icon Workspace in the sidebar of your Azure Databricks workspace to open the workspace browser.

  2. Use the workspace browser to select a directory for the Python modules.

  3. Click Kebab menu in the rightmost column of the selected directory and click Create > File.

  4. Enter a name for the file, for example, clickstream_raw_module.py. The file editor opens. To create a module to read source data into a table, enter the following in the editor window:

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. To create a module that creates a new table containing prepared data, create a new file in the same directory, enter a name for the file, for example, clickstream_prepared_module.py, and enter the following in the new editor window:

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. Next, create a pipeline notebook. Go to your Azure Databricks landing page and select Create a notebook, or click New Icon New in the sidebar and select Notebook. You can also create the notebook in the workspace browser by clicking Kebab menu and click Create > Notebook.

  7. Name your notebook and confirm Python is the default language.

  8. Click Create.

  9. Enter the example code in the notebook.

    Note

    If your notebook imports modules or packages from a workspace files path or a Git folders path different from the notebook directory, you must manually append the path to the files using sys.path.append().

    If you are importing a file from a Git folder, you must prepend /Workspace/ to the path. For example, sys.path.append('/Workspace/...'). Omitting /Workspace/ from the path results in an error.

    If the modules or packages are stored in the same directory as the notebook, you do not need to append the path manually. You also do not need to manually append the path when importing from the root directory of a Git folder because the root directory is automatically appended to the path.

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        spark.read.table("LIVE.clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    Replace <module-path> with the path to the directory containing the Python modules to import.

  10. Create a pipeline using the new notebook.

  11. To run the pipeline, in the Pipeline details page, click Start.

You can also import Python code as a package. The following code snippet from a Delta Live Tables notebook imports the test_utils package from the dlt_packages directory inside the same directory as the notebook. The dlt_packages directory contains the files test_utils.py and __init__.py, and test_utils.py defines the function create_test_table():

import dlt

@dlt.table
def my_table():
  return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)