Compartir a través de


Tutorial: Ejecución de una canalización de análisis de un extremo a otro en Lakehouse

En este tutorial se muestra cómo configurar una canalización de análisis de un extremo a otro para una instancia de almacén de lago de datos de Azure Databricks.

Importante

En este tutorial se usan cuadernos interactivos para completar tareas comunes de ETL de Python en clústeres habilitados para Unity Catalog. Si no usa el catálogo de Unity, consulte Ejecución de la primera carga de trabajo de ETL en Azure Databricks.

Tareas de este tutorial

Al final de este artículo, podrá hacer lo siguiente:

  1. Iniciar un clúster de proceso habilitado para Unity Catalog.
  2. Crear un cuaderno de Databricks.
  3. Escribir y leer datos desde una ubicación externa de Unity Catalog.
  4. Configurar la ingesta de datos incrementales en una tabla de Unity Catalog con Auto Loader.
  5. Ejecutar celdas de cuaderno para procesar, consultar y obtener una versión preliminar de los datos.
  6. Programar un cuaderno como un trabajo de Databricks.
  7. Consultar tablas de Unity Catalog desde Databricks SQL

Azure Databricks proporciona un conjunto de herramientas que están listas para producción y que permiten a los profesionales de datos desarrollar e implementar con rapidez canalizaciones de extracción, transformación y carga (ETL). Unity Catalog permite a los administradores de datos configurar y proteger las credenciales de almacenamiento, las ubicaciones externas y los objetos de base de datos para los usuarios de toda la organización. Databricks SQL permite a los analistas ejecutar consultas SQL en las mismas tablas que se usan en cargas de trabajo ETL de producción, lo que hace posible la inteligencia empresarial en tiempo real a escala.

También puede usar Delta Live Tables para compilar canalizaciones de ETL. Databricks creó Delta Live Tables para reducir la complejidad de la creación, implementación y mantenimiento de canalizaciones de ETL de producción. Consulte Tutorial: ejecute su primera canalización de Delta Live Tables.

Requisitos

Nota:

Aunque no tenga privilegios de control de clúster, puede completar la mayoría de los pasos siguientes siempre que tenga acceso a un clúster.

Paso 1: creación de un clúster

Para realizar análisis de datos exploratorios e ingeniería de datos, puede crear un clúster a fin de proporcionar los recursos de proceso necesarios para ejecutar comandos.

  1. Haga clic en Icono Proceso Proceso en la barra lateral.
  2. Haga clic en Icono NuevoNuevo en la barra lateral y seleccione Clúster. Se abrirá la página Nuevo clúster/Proceso.
  3. Especifique un nombre único para el clúster.
  4. Seleccione el botón de radio Nodo único.
  5. Seleccione Usuario único en la lista desplegable Modo de acceso.
  6. Asegúrese de que la dirección de correo electrónico esté visible en el campo Usuario único.
  7. Seleccione la versión de runtime de Databricks necesaria, 11.1 o superior, para usar Unity Catalog.
  8. Haga clic en Crear proceso para crear el clúster.

Para obtener más información sobre los clústeres de Databricks, consulte Compute.

Paso 2: creación de un cuaderno de Databricks

Para crear un cuaderno en el área de trabajo, haga clic en Nuevo icono Nuevo en la barra lateral y, después, haga clic en Cuaderno. Se abre un cuaderno en blanco en el área de trabajo.

Para obtener más información sobre cómo crear y administrar cuadernos, consulte Administración de cuadernos.

Paso 3: escritura y lectura de datos desde una ubicación externa que administra Unity Catalog

Databricks recomienda usar el Auto Loader para la ingesta de datos incremental. Auto Loader detecta y procesa automáticamente los nuevos archivos a medida que llegan al almacenamiento de objetos en la nube.

Use Unity Catalog para administrar el acceso seguro a ubicaciones externas. Los usuarios o entidades de servicio con permisos READ FILES en una ubicación externa pueden usar Auto Loader para ingerir datos.

Normalmente, los datos llegarán a una ubicación externa, debido a las escrituras de otros sistemas. En esta demostración, puede simular la llegada de datos mediante la escritura de archivos JSON en una ubicación externa.

Copie el código siguiente en una celda del cuaderno. Reemplace el valor de cadena de catalog por el nombre de un catálogo con permisos de CREATE CATALOG y USE CATALOG. Reemplace el valor de cadena de external_location por la ruta de acceso de una ubicación externa con permisos de READ FILES, WRITE FILES y CREATE EXTERNAL TABLE.

Las ubicaciones externas se pueden definir como un contenedor de almacenamiento completo, pero suelen apuntar a un directorio que esté anidado en un contenedor.

El formato correcto para una ruta de acceso de ubicación externa es "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

La ejecución de esta celda debe imprimir una línea en la que se lee 12 bytes, imprimir la cadena "Hello world!", así como mostrar todas las bases de datos del catálogo que se haya proporcionado. Si no puede ejecutar esta celda, confirme que está en un área de trabajo habilitada para Unity Catalog y solicite los permisos adecuados del administrador del área de trabajo para completar este tutorial.

El siguiente código de Python usa la dirección de correo electrónico para crear una base de datos única en el catálogo que se ha proporcionado, así como una ubicación de almacenamiento única en la ubicación externa proporcionada. Cuando se ejecute esta celda, se quitarán todos los datos que están asociados a este tutorial, lo que le permitirá ejecutar este ejemplo de manera idempotente. Se define una clase y se crea una instancia que se usará para simular lotes de datos que lleguen desde un sistema conectado a la ubicación externa de origen.

Copie este código en una nueva celda del cuaderno y ejecútelo para configurar el entorno.

Nota

Las variables que se definen en este código deben permitirle que la ejecute de forma segura, sin correr el riesgo de entrar en conflicto con los recursos del área de trabajo existentes u otros usuarios. Los permisos de red o almacenamiento restringidos producirán errores cuando ejecuten este código; póngase en contacto con el administrador del área de trabajo para solucionar problemas de estas restricciones.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Ahora puede obtener un lote de datos al copiar el código siguiente en una celda y ejecutándolo. Puede ejecutar esta celda manualmente hasta 60 veces, a fin de desencadenar la nueva llegada de datos.

RawData.land_batch()

Paso 4: configuración de Auto Loader para ingerir datos en Unity Catalog

Databricks recomienda el almacenamiento de datos con Delta Lake. Delta Lake es una capa de almacenamiento de código abierto que proporciona transacciones ACID y habilita el almacén de lago de datos. Delta Lake es el formato predeterminado para las tablas que se crean en Databricks.

Para configurar Auto Loader a fin de ingerir datos en Unity Catalog, copie y pegue el código siguiente en una celda vacía del cuaderno:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Para obtener más información sobre Auto Loader, consulte Auto Loader.

Para obtener más información sobre Structured Streaming con Unity Catalog, consulte Uso de Unity Catalog con Structured Streaming.

Paso 5: procesamiento e interacción con datos

Los cuadernos ejecutan la lógica celda por celda. Siga estos pasos para ejecutar la lógica en la celda:

  1. Para ejecutar la celda que completó en el paso anterior, selecciónela y presione SHIFT+ENTER.

  2. Para consultar la tabla que acaba de crear, copie y pegue el código siguiente en una celda vacía y, a continuación, presione SHIFT+ENTER para ejecutar la celda.

    df = spark.read.table(table)
    
  3. Para obtener una versión preliminar de los datos de DataFrame, copie y pegue el código siguiente en una celda vacía y, a continuación, presione SHIFT+ENTER para ejecutar la celda.

    display(df)
    

Para obtener más información sobre las opciones interactivas para visualizar los datos, consulte Visualizaciones en cuadernos de Databricks.

Paso 6: programación de un trabajo

Puede ejecutar cuadernos de Databricks como scripts de producción al agregarlos como una tarea en un trabajo de Databricks. En este paso, creará un nuevo trabajo que podrá desencadenar manualmente.

Para programar el cuaderno como una tarea:

  1. Haga clic en Programación en el lado derecho de la barra de encabezado.
  2. Introduzca un nombre único para el nombre del trabajo.
  3. Haga clic en Manual.
  4. En la lista desplegable Clúster, seleccione el clúster que creó en el paso 1.
  5. Haga clic en Crear.
  6. En la ventana que aparece, haga clic en Ejecutar ahora.
  7. Para ver los resultados de la ejecución del trabajo, haga clic en el icono Vínculo externo junto a la marca de tiempo Última ejecución.

Para obtener más información sobre los trabajos, consulte ¿Qué son los trabajos de Databricks?

Paso 7: consulta de tabla desde Databricks SQL

Cualquier persona con el permiso USE CATALOG en el catálogo actual, el permiso USE SCHEMA en el esquema actual y los permisos SELECT en la tabla puede consultar el contenido de la tabla desde la API de Databricks que prefiera.

Necesita acceder a un almacén de SQL en ejecución para ejecutar consultas en Databricks SQL.

La tabla que creó antes en este tutorial se llama target_table. Puede consultarlo mediante el catálogo que proporcionó en la primera celda y la base de datos con el patrón e2e_lakehouse_<your-username>. Puede usar Catalog Explorer para buscar los objetos de datos que ha creado.

Otras integraciones

Obtenga más información sobre las integraciones y herramientas para la ingeniería de datos con Azure Databricks: