Поделиться через


Импорт модулей Python из папок Git или файлов рабочей области

Вы можете хранить код Python в папках Databricks Git или в файлах рабочей области , а затем импортировать этот код Python в конвейеры DLT. Дополнительные сведения о работе с модулями в папках Git или файлах рабочей области см. в разделе Работа с модулями Python и R.

Заметка

Исходный код нельзя импортировать из записной книжки, хранящейся в папке Databricks Git или в файле рабочей области. Вместо этого добавьте записную книжку непосредственно при создании или изменении конвейера. См. настройкаконвейера DLT.

Импорт модуля Python в конвейер DLT

В следующем примере показано импорт запросов набора данных в виде модулей Python из файлов рабочей области. Хотя в этом примере описывается использование файлов рабочей области для хранения исходного кода конвейера, его можно использовать с исходным кодом, хранящимся в папке Git.

Чтобы выполнить этот пример, выполните следующие действия.

  1. Щелкните значок Рабочие областиРабочая область на боковой панели вашего рабочего пространства Azure Databricks, чтобы открыть браузер этого рабочего пространства.

  2. Используйте браузер рабочей области для выбора каталога для модулей Python.

  3. Щелкните меню Kebab в правом столбце выбранного каталога и щелкните Создать > файл.

  4. Введите имя файла, например clickstream_raw_module.py. Откроется редактор файлов. Чтобы создать модуль для чтения исходных данных в таблицу, введите следующее в окне редактора:

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. Чтобы создать модуль, создающий новую таблицу, содержащую подготовленные данные, создайте новый файл в том же каталоге, введите имя файла, например clickstream_prepared_module.py, и введите следующее в новом окне редактора:

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. Затем создайте записную книжку конвейера. Перейдите на целевую страницу Azure Databricks и выберите Создать записную книжкуили щелкните новый значокСоздать на боковой панели и выберите Записная книжка. Вы также можете создать записную книжку в браузере рабочей области, щелкнув меню Kebab и щелкнув Создать записную книжку > записную книжку.

  7. Назовите записную книжку и убедитесь, что Python является языком по умолчанию.

  8. Щелкните Создать.

  9. Введите пример кода в записной книжке.

    Заметка

    Если ваша записная книжка импортирует модули или пакеты из пути к файлам рабочей области или пути к папкам Git, которые отличаются от каталога записной книжки, необходимо вручную добавить путь к файлам, используя sys.path.append().

    При импорте файла из папки Git необходимо добавить /Workspace/ в начало пути. Например, sys.path.append('/Workspace/...'). Опущение /Workspace/ из пути приводит к ошибке.

    Если модули или пакеты хранятся в том же каталоге, что и записная книжка, вам не нужно добавлять путь вручную. При импорте из корневого каталога папки Git также не требуется вручную добавлять путь, так как корневой каталог автоматически добавляется в путь.

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        spark.read.table("catalog_name.schema_name.clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    Замените <module-path> путем к каталогу, содержаму модули Python для импорта.

  10. Создайте конвейер с помощью новой записной книжки.

  11. Чтобы запустить конвейер, на странице сведений конвейера нажмите кнопку "Начать" .

Вы также можете импортировать код Python в виде пакета. Следующий фрагмент кода из записной книжки DLT импортирует пакет test_utils из каталога dlt_packages в том же каталоге, что и записная книжка. Каталог dlt_packages содержит файлы test_utils.py и __init__.py, а test_utils.py определяет функцию create_test_table():

import dlt

@dlt.table
def my_table():
  return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)