Импорт модулей Python из папок Git или файлов рабочей области
Вы можете хранить код Python в папках Git Databricks или в файлах рабочей области, а затем импортировать этот код Python в конвейеры Delta Live Tables. Дополнительные сведения о работе с модулями в папках Git или файлах рабочей области см. в статье "Работа с модулями Python и R".
Примечание.
Исходный код нельзя импортировать из записной книжки, хранящейся в папке Databricks Git или в файле рабочей области. Вместо этого добавьте записную книжку непосредственно при создании или изменении конвейера. См. статью "Настройка конвейера разностных динамических таблиц".
Импорт модуля Python в конвейер Delta Live Tables
В следующем примере показано импорт запросов набора данных в виде модулей Python из файлов рабочей области. Хотя в этом примере описывается использование файлов рабочей области для хранения исходного кода конвейера, его можно использовать с исходным кодом, хранящимся в папке Git.
Чтобы выполнить этот пример, выполните следующие действия.
Щелкните рабочую область на боковой панели рабочей области Azure Databricks, чтобы открыть браузер рабочей области.
Используйте браузер рабочей области для выбора каталога для модулей Python.
Щелкните самый правый столбец выбранного каталога и нажмите кнопку "Создать > файл".
Введите имя файла, например
clickstream_raw_module.py
. Откроется редактор файлов. Чтобы создать модуль для чтения исходных данных в таблицу, введите следующее в окне редактора:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Чтобы создать модуль, создающий новую таблицу, содержащую подготовленные данные, создайте новый файл в том же каталоге, введите имя файла, например
clickstream_prepared_module.py
, и введите следующее в новом окне редактора:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Затем создайте записную книжку конвейера. Перейдите на целевую страницу Azure Databricks и выберите "Создать записную книжку" или нажмите кнопку "Создать" на боковой панели и выберите "Записная книжка". Вы также можете создать записную книжку в браузере рабочей области, нажав кнопку "Создать > записную книжку".
Назовите записную книжку и подтвердите , что Python является языком по умолчанию.
Нажмите кнопку Создать.
Введите пример кода в записной книжке.
Примечание.
Если записная книжка импортирует модули или пакеты из пути к файлам рабочей области или путь к папкам Git, отличный от каталога записной книжки, необходимо вручную добавить путь к файлам с помощью
sys.path.append()
.При импорте файла из папки Git необходимо приставить
/Workspace/
его к пути. Например,sys.path.append('/Workspace/...')
. Пропуск/Workspace/
пути приводит к ошибке.Если модули или пакеты хранятся в том же каталоге, что и записная книжка, вам не нужно добавлять путь вручную. При импорте из корневого каталога папки Git также не требуется вручную добавлять путь, так как корневой каталог автоматически добавляется в путь.
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("LIVE.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Замените
<module-path>
путь к каталогу, содержаму модули Python для импорта.Создайте конвейер с помощью новой записной книжки.
Чтобы запустить конвейер, на странице сведений о конвейере нажмите кнопку "Пуск".
Вы также можете импортировать код Python в виде пакета. Следующий фрагмент кода из записной книжки Delta Live Tables импортирует test_utils
пакет из dlt_packages
каталога в том же каталоге, что и записная книжка. Каталог dlt_packages
содержит файлы test_utils.py
и __init__.py
test_utils.py
определяет функциюcreate_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)