Importera Python-moduler från Git-mappar eller arbetsytefiler
Du kan lagra Python-kod i Databricks Git-mappar eller i arbetsytefiler och sedan importera Python-koden till dina Delta Live-Tables pipelines. Mer information om hur du arbetar med moduler i Git-mappar eller arbetsytefiler finns i Arbeta med Python- och R-moduler.
Kommentar
Du kan inte importera källkod från en notebook-fil som lagras i en Databricks Git-mapp eller en arbetsytefil. Lägg i stället till anteckningsboken direkt när du skapar eller redigerar en pipeline. Se Konfigurera en Delta Live Tables-pipeline.
Importera en Python-modul till en Delta Live-Tables pipeline
I följande exempel visas hur du importerar datamängdsfrågor som Python-moduler från arbetsytefiler. Även om det här exemplet beskriver hur du använder arbetsytefiler för att lagra pipelinens källkod, kan du använda den med källkod som lagras i en Git-mapp.
Använd följande steg för att köra det här exemplet:
Klicka på Arbetsyta i sidofältet på din Azure Databricks-arbetsyta för att öppna webbläsaren för arbetsytan.
Använd arbetsytans webbläsare för att select en katalog för Python-modulerna.
Klicka på kebabmenyn längst till höger column i den valda katalogen och klicka på Skapa > Fil.
Ange ett namn på filen, till exempel
clickstream_raw_module.py
. Filredigeraren öppnas. Om du vill skapa en modul för att läsa källdata i en tableanger du följande i redigeraren window:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Skapa en modul som skapar en ny table som innehåller förberedda data genom att skapa en ny fil i samma katalog, ange ett namn på filen, till exempel
clickstream_prepared_module.py
och ange följande i den nya redigeraren window:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Skapa sedan en pipeline notebook-fil. Gå till din Azure Databricks-landningssida och selectSkapa en notebookeller klicka på Ny i sidofältet och selectNotebook. Du kan också skapa anteckningsboken i arbetsytans webbläsare genom att klicka på Och klicka på Skapa > anteckningsbok.
Namnge anteckningsboken och bekräfta att Python är standardspråket.
Klicka på Skapa.
Ange exempelkoden i notebook-filen.
Kommentar
Om din notebook-fil importerar moduler eller paket från en sökväg för arbetsytor eller en Sökväg för Git-mappar som skiljer sig från notebook-katalogen, måste du lägga till sökvägen till filerna manuellt med hjälp av
sys.path.append()
.Om du importerar en fil från en Git-mapp måste du förbereda
/Workspace/
till sökvägen. Exempel:sys.path.append('/Workspace/...')
Om du utelämnar/Workspace/
sökvägen resulterar det i ett fel.Om modulerna eller paketen lagras i samma katalog som notebook-filen behöver du inte lägga till sökvägen manuellt. Du behöver inte heller lägga till sökvägen manuellt när du importerar från rotkatalogen i en Git-mapp eftersom rotkatalogen läggs till automatiskt i sökvägen.
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("LIVE.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Ersätt
<module-path>
med sökvägen till katalogen som innehåller De Python-moduler som ska importeras.Skapa en pipeline med hjälp av den nya notebook-filen.
Om du vill köra pipelinen klickar du på Start på sidan Pipelineinformation.
Du kan också importera Python-kod som ett paket. Följande kodfragment från en Delta Live-Tables notebook importerar test_utils
-paketet från mappen dlt_packages
i samma mapp som notebooken. Katalogen dlt_packages
innehåller filerna test_utils.py
och __init__.py
och test_utils.py
definierar funktionen create_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)