Compartir a través de


Importación de módulos de Python desde carpetas de Git o archivos de área de trabajo

Puede almacenar código de Python en Carpetas de Git de Databricks o en Archivos del área de trabajo e importar ese código de Python en las canalizaciones de Delta Live Tables. Para obtener más información sobre cómo trabajar con módulos en carpetas de Git o archivos de área de trabajo, vea Trabajar con módulos de Python y R.

Nota:

No se puede importar el código fuente de un cuaderno almacenado en una carpeta de Git de Databricks ni en un archivo de área de trabajo. En su lugar, agregue el cuaderno directamente al crear o editar una canalización. Consulte Configuración de una canalización de Delta Live Tables.

Importación de un módulo de Python a una canalización de Delta Live Tables

En el ejemplo siguiente se muestra cómo importar consultas de conjunto de datos como módulos de Python desde archivos del área de trabajo. Aunque en este ejemplo se describe el uso de archivos de área de trabajo para almacenar el código fuente de la canalización, puede usarlo con código fuente almacenado en una carpeta Git.

Para ejecutar este ejemplo, siga estos pasos:

  1. Haga clic en Icono de las áreas de trabajo Área de trabajo en la barra lateral del área de trabajo de Azure Databricks para abrir el explorador del área de trabajo.

  2. Use el explorador del área de trabajo para seleccionar un directorio para los módulos de Python.

  3. Haga clic Kebab en la columna situada a la derecha del directorio seleccionado y haga clic en Crear>Archivo.

  4. Escriba un nombre para el archivo, por ejemplo, clickstream_raw_module.py. Se abre el editor de archivos. Para crear un módulo para leer los datos de origen en una tabla, escriba lo siguiente en la ventana del editor:

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. Para crear un módulo que cree una nueva tabla que contenga datos preparados, cree un nuevo archivo en el mismo directorio, escriba un nombre para el archivo, por ejemplo, clickstream_prepared_module.py, y escriba lo siguiente en la nueva ventana del editor:

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. A continuación, cree un cuaderno de canalización. Vaya a la página de aterrizaje de Azure Databricks y seleccione Crear un cuaderno, o haga clic en Icono Nuevo Nuevo en la barra lateral y seleccione Notebook. También puede crear el cuaderno en el explorador del área de trabajo; para ello, haga clic en Kebab y haga clic en Crear > Notebook.

  7. Asigne un nombre al cuaderno y confirme que Python es el idioma predeterminado.

  8. Haga clic en Crear.

  9. Escriba el código de ejemplo en el cuaderno.

    Nota:

    Si el cuaderno importa módulos o paquetes desde una ruta de acceso de archivos del área de trabajo o una ruta de acceso de carpetas de Git diferente del directorio del cuaderno, debe anexar manualmente la ruta de acceso a los archivos mediante sys.path.append().

    Si va a importar un archivo desde una carpeta Git, debe anteponer /Workspace/ a la ruta de acceso. Por ejemplo, sys.path.append('/Workspace/...'). Si se omite /Workspace/ de la ruta de acceso, se produce un error.

    Si los módulos o paquetes se almacenan en el mismo directorio que el cuaderno, no es necesario anexar la ruta de acceso manualmente. Tampoco es necesario anexar manualmente la ruta de acceso al importar desde el directorio raíz de una carpeta Git porque el directorio raíz se anexa automáticamente a la ruta de acceso.

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        spark.read.table("LIVE.clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    Reemplace <module-path> por la ruta de acceso al directorio que contiene los módulos de Python que se van a importar.

  10. Crear una canalización usando el nuevo cuaderno.

  11. Para ejecutar la canalización, en la página Detalles de la canalización haga clic en Iniciar.

También puede importar código de Python como paquete. El siguiente fragmento de código de un cuaderno Delta Live Tables importa eltest_utils paquete desde el dlt_packages directorio dentro del mismo directorio que el cuaderno. El directorio dlt_packages contiene los archivos test_utils.py y __init__.py, y test_utils.py define la función create_test_table():

import dlt

@dlt.table
def my_table():
  return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)