Partilhar via


Importar módulos Python de pastas Git ou arquivos de espaço de trabalho

Você pode armazenar código Python em pastas Git Databricks ou em arquivos de espaço de trabalho e, em seguida, importar esse código Python para seus pipelines Delta Live Tables. Para obter mais informações sobre como trabalhar com módulos em pastas Git ou arquivos de espaço de trabalho, consulte Trabalhar com módulos Python e R.

Nota

Não é possível importar o código-fonte de um bloco de anotações armazenado em uma pasta do Databricks Git ou em um arquivo de espaço de trabalho. Em vez disso, adicione o bloco de anotações diretamente ao criar ou editar um pipeline. Consulte Configurar um pipeline Delta Live Tables.

Importar um módulo Python para um pipeline Delta Live Tables

O exemplo a seguir demonstra a importação de consultas de conjunto de dados como módulos Python de arquivos de espaço de trabalho. Embora este exemplo descreva o uso de arquivos de espaço de trabalho para armazenar o código-fonte do pipeline, você pode usá-lo com o código-fonte armazenado em uma pasta Git.

Para executar este exemplo, use as seguintes etapas:

  1. Clique em Ícone Espaços de trabalho Espaço de trabalho na barra lateral do seu espaço de trabalho do Azure Databricks para abrir o navegador de espaço de trabalho.

  2. Use o navegador de espaço de trabalho para selecionar um diretório para os módulos Python.

  3. Clique Menu de kebab na coluna mais à direita do diretório selecionado e clique em Criar > arquivo.

  4. Insira um nome para o arquivo, por exemplo, clickstream_raw_module.py. O editor de arquivos é aberto. Para criar um módulo para ler dados de origem em uma tabela, digite o seguinte na janela do editor:

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. Para criar um módulo que cria uma nova tabela contendo dados preparados, crie um novo arquivo no mesmo diretório, insira um nome para o arquivo, por exemplo, clickstream_prepared_module.pye digite o seguinte na nova janela do editor:

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. Em seguida, crie um bloco de anotações de pipeline. Vá para a página inicial do Azure Databricks e selecione Criar um bloco de anotações ou cliqueNovo ícone em Novo na barra lateral e selecione Bloco de Anotações. Você também pode criar o bloco de anotações no navegador de espaço de trabalho clicando e Menu de kebab clicando em Criar > Bloco de Anotações.

  7. Nomeie seu bloco de anotações e confirme se Python é a linguagem padrão.

  8. Clique em Criar.

  9. Insira o código de exemplo no bloco de anotações.

    Nota

    Se o bloco de anotações importar módulos ou pacotes de um caminho de arquivos do espaço de trabalho ou de um caminho de pastas Git diferente do diretório do bloco de anotações, você deverá anexar manualmente o caminho aos arquivos usando sys.path.append()o .

    Se você estiver importando um arquivo de uma pasta Git, deverá preceder /Workspace/ o caminho. Por exemplo, sys.path.append('/Workspace/...'). Omitir /Workspace/ do caminho resulta em um erro.

    Se os módulos ou pacotes estiverem armazenados no mesmo diretório do bloco de anotações, não será necessário acrescentar o caminho manualmente. Você também não precisa acrescentar manualmente o caminho ao importar do diretório raiz de uma pasta Git porque o diretório raiz é automaticamente anexado ao caminho.

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        spark.read.table("LIVE.clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    Substitua <module-path> pelo caminho para o diretório que contém os módulos Python a serem importados.

  10. Crie um pipeline usando o novo bloco de anotações.

  11. Para executar o pipeline, na página Detalhes do pipeline, clique em Iniciar.

Você também pode importar código Python como um pacote. O trecho de código a seguir de um bloco de anotações Delta Live Tables importa o test_utils pacote do diretório dentro do mesmo diretório do bloco de dlt_packages anotações. O dlt_packages diretório contém os arquivos test_utils.py e __init__.py, e test_utils.py define a função create_test_table():

import dlt

@dlt.table
def my_table():
  return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)