Importowanie modułów języka Python z folderów git lub plików obszaru roboczego
Kod w języku Python można przechowywać w folderach Databricks Git lub w plikach przestrzeni roboczej , a następnie zaimportować go do potoków DLT. Aby uzyskać więcej informacji na temat pracy z modułami w folderach Git lub plikach obszaru roboczego, zobacz Praca z modułami python i R.
Notatka
Nie można zaimportować kodu źródłowego z notatnika przechowywanego w folderze Git w Databricks lub pliku przestrzeni roboczej. Zamiast tego możesz dodać notatnik bezpośrednio podczas tworzenia lub edytowania potoku. Zobacz Skonfiguruj pipelinę DLT.
Importowanie modułu języka Python do potoku DLT
W poniższym przykładzie pokazano importowanie zapytań zestawu danych jako modułów języka Python z plików obszaru roboczego. Mimo że w tym przykładzie opisano używanie plików obszaru roboczego do przechowywania kodu źródłowego potoku, można go także użyć z kodem źródłowym przechowywanym w folderze Git.
Aby uruchomić ten przykład, wykonaj następujące kroki:
Kliknij ikonę
Obszar roboczy na pasku bocznym obszaru roboczego usługi Azure Databricks, aby otworzyć przeglądarkę obszarów roboczych.
Użyj przeglądarki obszaru roboczego, aby wybrać katalog dla modułów języka Python.
Kliknij
w prawej kolumnie wybranego katalogu i kliknij Utwórz plik >.
Wprowadź nazwę pliku, na przykład
clickstream_raw_module.py
. Zostanie otwarty edytor plików. Aby utworzyć moduł odczytu danych źródłowych do tabeli, wprowadź następujące polecenie w oknie edytora:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Aby utworzyć moduł, który tworzy nową tabelę zawierającą przygotowane dane, utwórz nowy plik w tym samym katalogu, wprowadź nazwę pliku, na przykład
clickstream_prepared_module.py
i wprowadź następujące polecenie w nowym oknie edytora:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Następnie utwórz notatnik pipeline'u. Przejdź do strony docelowej usługi Azure Databricks i wybierz pozycję Utwórz noteslub kliknij pozycję
Nowy na pasku bocznym i wybierz pozycję Notes. Notes można również utworzyć w przeglądarce obszaru roboczego, klikając menu
i klikając pozycję Utwórz notes >.
Nadaj nazwę notesowi i upewnij się, że Python jest językiem domyślnym.
Kliknij Utwórz.
Wprowadź przykładowy kod w notesie.
Notatka
Jeśli notebook importuje moduły lub pakiety ze ścieżki plików obszaru roboczego lub ścieżki folderów Git innej niż katalog notebooka, należy ręcznie dołączyć ścieżkę do plików przy użyciu
sys.path.append()
.Jeśli importujesz plik z folderu Git, musisz dodać na początku
/Workspace/
na początku ścieżki. Na przykładsys.path.append('/Workspace/...')
. Pominięcie/Workspace/
ze ścieżki powoduje wystąpienie błędu.Jeśli moduły lub pakiety są przechowywane w tym samym katalogu co notes, nie trzeba ręcznie dołączać ścieżki. Nie trzeba również ręcznie dołączać ścieżki podczas importowania z katalogu głównego folderu Git, ponieważ katalog główny jest automatycznie dołączany do ścieżki.
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("catalog_name.schema_name.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Zastąp
<module-path>
ścieżką do katalogu zawierającego moduły języka Python do zaimportowania.Utwórz potok przy użyciu nowego notesu.
Aby uruchomić potok, na stronie szczegóły potoku kliknij przycisk Start.
Możesz również zaimportować kod języka Python jako pakiet. Poniższy fragment kodu z notesu DLT importuje pakiet test_utils
z katalogu dlt_packages
wewnątrz tego samego katalogu co notes. Katalog dlt_packages
zawiera pliki test_utils.py
i __init__.py
, a test_utils.py
definiuje create_test_table()
funkcji :
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)