Sdílet prostřednictvím


Import modulů Pythonu ze složek Gitu nebo souborů pracovního prostoru

Kód Pythonu můžete uložit do složek Gitu Databricks nebo do souborů pracovního prostoru a pak tento kód Pythonu importovat do kanálů DLT. Další informace o práci s moduly ve složkách Gitu nebo souborech pracovních prostorů najdete v tématu Práce s moduly Pythonu a R.

Poznámka

Zdrojový kód nemůžete importovat z poznámkového bloku uloženého ve složce Databricks Git nebo ze souboru pracovního prostoru. Místo toho přidejte poznámkový blok přímo, když vytváříte nebo upravujete kanál. Viz Konfigurace potrubí DLT.

Import modulu Python do kanálu DLT

Následující příklad ukazuje, jak importovat dotazy na datovou sadu ve formě modulů programu Python ze souborů pracovního prostředí. Ačkoli tento příklad popisuje použití souborů pracovního prostoru pro uložení zdrojového kódu pipeline, můžete tento postup použít se zdrojovým kódem uloženým ve složce Git.

Pokud chcete spustit tento příklad, použijte následující kroky:

  1. Klikněte na ikonu pracovních prostorů ve vašem pracovním prostoru Azure Databricks na postranním panelu, abyste otevřeli prohlížeč pracovního prostoru.

  2. Pomocí prohlížeče pracovního prostoru vyberte adresář pro moduly Pythonu.

  3. Klikněte na nabídku Kebab v nejpravějším sloupci vybraného adresáře a klikněte na Vytvořit > Soubor.

  4. Zadejte název souboru, například clickstream_raw_module.py. Otevře se editor souborů. Pokud chcete vytvořit modul pro čtení zdrojových dat do tabulky, zadejte do okna editoru následující:

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. Pokud chcete vytvořit modul, který vytvoří novou tabulku obsahující připravená data, vytvořte nový soubor ve stejném adresáři, zadejte název souboru, například clickstream_prepared_module.pya do nového okna editoru zadejte následující:

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. Dále vytvořte pipeline notebook. Přejděte na úvodní stránku Azure Databricks a vyberte Vytvořit notebook, nebo klikněte na Ikona NovýNový v bočním panelu a vyberte Notebook. Poznámkový blok můžete vytvořit také v prohlížeči pracovního prostoru kliknutím na nabídku Kebab a kliknutím na Vytvořit > Poznámkový blok.

  7. Pojmenujte poznámkový blok a potvrďte, že Python je výchozím jazykem.

  8. Klikněte na Vytvořit.

  9. Do poznámkového bloku zadejte ukázkový kód.

    Poznámka

    Pokud váš poznámkový blok importuje moduly nebo balíčky z cesty k souborům pracovního prostoru nebo z cesty ke složkám Git, které se liší od adresáře poznámkového bloku, musíte ručně připojit cestu k souborům pomocí sys.path.append().

    Pokud importujete soubor ze složky Git, musíte předřadit /Workspace/ k cestě. Například sys.path.append('/Workspace/...'). Vynechání /Workspace/ z cesty způsobí chybu.

    Pokud jsou moduly nebo balíčky uložené ve stejném adresáři jako poznámkový blok, nemusíte cestu připojovat ručně. Při importu z kořenového adresáře složky Git také nemusíte cestu přidávat ručně, protože kořenový adresář se k cestě automaticky připojí.

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        spark.read.table("catalog_name.schema_name.clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    Nahraďte <module-path> cestou k adresáři obsahujícímu moduly Pythonu, které se mají importovat.

  10. Vytvořte potrubí pomocí nového poznámkového bloku.

  11. Na stránce Podrobnosti potrubí klikněte na Start, abyste spustili potrubí.

Kód Pythonu můžete také importovat jako balíček. Následující fragment kódu z poznámkového bloku DLT naimportuje balíček test_utils z adresáře dlt_packages ve stejném adresáři jako poznámkový blok. Adresář dlt_packages obsahuje soubory test_utils.py a __init__.pya test_utils.py definuje funkci create_test_table():

import dlt

@dlt.table
def my_table():
  return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)