Importar módulos Python de pastas Git ou arquivos de espaço de trabalho
Você pode armazenar código Python em pastas Git Databricks ou em arquivos de espaço de trabalho e, em seguida, importar esse código Python para seus pipelines DLT. Para obter mais informações sobre como trabalhar com módulos em pastas Git ou arquivos de espaço de trabalho, consulte Trabalhar com módulos Python e R.
Observação
Não é possível importar o código-fonte de um bloco de anotações armazenado em uma pasta do Databricks Git ou em um arquivo de espaço de trabalho. Em vez disso, adicione o bloco de anotações diretamente ao criar ou editar o pipeline. Consulte Configurar um pipeline de DLT.
Importar um módulo Python para um pipeline DLT
O exemplo a seguir demonstra a importação de consultas de conjunto de dados como módulos Python de arquivos de espaço de trabalho. Embora este exemplo descreva o uso de arquivos de espaço de trabalho para armazenar o código-fonte do pipeline, você pode usá-lo com o código-fonte armazenado em uma pasta Git.
Para executar este exemplo, use as seguintes etapas:
Clique no ícone
Espaço de trabalho na barra lateral do seu espaço de trabalho do Azure Databricks para abrir o navegador de espaços de trabalho.
Use o navegador de espaço de trabalho para selecionar um diretório para os módulos Python.
Clique no menu Kebab
na coluna mais à direita do diretório selecionado e depois em Criar Arquivo >.
Insira um nome para o arquivo, por exemplo,
clickstream_raw_module.py
. O editor de arquivos é aberto. Para criar um módulo para ler dados de origem em uma tabela, digite o seguinte na janela do editor:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Para criar um módulo que cria uma nova tabela contendo dados preparados, crie um novo arquivo no mesmo diretório, insira um nome para o arquivo, por exemplo,
clickstream_prepared_module.py
, e digite o seguinte na nova janela do editor:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Em seguida, crie um caderno de pipeline. Vá para a página de aterrissagem do Azure Databricks e selecione Criar um bloco de anotações ou clique em
Novo na barra lateral e selecione Bloco de Anotações. Você também pode criar o bloco de notas no navegador do espaço de trabalho, clicando no menu Kebab
e depois em Criar Bloco de Notas >.
Dê um nome ao seu bloco de notas e confirme que Python é a linguagem padrão.
Clique Criar.
Insira o código de exemplo no bloco de anotações.
Observação
Se o seu notebook importar módulos ou pacotes de um caminho dos ficheiros do espaço de trabalho ou de um caminho das pastas Git diferente do diretório do notebook, deverá anexar manualmente o caminho aos ficheiros usando
sys.path.append()
.Se estiveres a importar um ficheiro de uma pasta Git, deves acrescentar
/Workspace/
ao caminho. Por exemplo,sys.path.append('/Workspace/...')
. Omitir/Workspace/
do caminho resulta em um erro.Se os módulos ou pacotes estiverem armazenados no mesmo diretório do bloco de anotações, não será necessário acrescentar o caminho manualmente. Você também não precisa acrescentar manualmente o caminho ao importar do diretório raiz de uma pasta Git porque o diretório raiz é automaticamente anexado ao caminho.
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("catalog_name.schema_name.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Substitua
<module-path>
pelo caminho para o diretório que contém os módulos Python a serem importados.Crie um pipeline usando o novo caderno de notas.
Para executar o pipeline, na página Detalhes do pipeline , clique no botão Iniciar.
Você também pode importar código Python como um pacote. O seguinte trecho de código de um notebook DLT importa o pacote test_utils
do diretório dlt_packages
localizado no mesmo diretório que o notebook. O diretório dlt_packages
contém os arquivos test_utils.py
e __init__.py
, e test_utils.py
define a função create_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)