Compartir a través de


Tutorial: ejecute su primera canalización de Delta Live Tables

Este tutorial le guía por los pasos necesarios para configurar la primera canalización de Delta Live Tables, escribir código ETL básico y ejecutar una actualización de canalización.

Todos los pasos de este tutorial están diseñados para áreas de trabajo con el catálogo de Unity habilitado. También puede configurar canalizaciones de Delta Live Tables para que funcionen con el metastore de Hive heredado. Consulte Uso de canalizaciones de Delta Live Tables con metastore de Hive heredado.

Nota:

En este tutorial se proporcionan instrucciones para desarrollar y validar el nuevo código de canalización mediante cuadernos de Databricks. También puede configurar canalizaciones mediante código fuente en archivos Python o SQL.

Puede configurar una canalización para ejecutar el código si ya tiene código fuente escrito mediante la sintaxis de Delta Live Tables. Consulte Configuración de una canalización de Delta Live Tables.

Puede usar la sintaxis SQL totalmente declarativa en Databricks SQL para registrar y establecer programaciones de actualización para vistas materializadas y tablas de streaming como objetos administrados por el catálogo de Unity. Consulte Uso de vistas materializadas en Databricks SQL y Carga de datos mediante tablas de streaming en Databricks SQL.

Ejemplo: Ingesta y procesamiento de datos de nombres de bebés de Nueva York

En el ejemplo de este artículo se usa un conjunto de datos disponible públicamente que contiene registros de los nombres de bebés del estado de Nueva York. En este ejemplo se muestra cómo usar una canalización de Delta Live Tables para:

  • Lee los datos CSV sin procesar de un volumen en una tabla.
  • Lea los registros de la tabla de ingesta y use las expectativas de Delta Live Tables para crear una nueva tabla que contenga datos limpios .
  • Usar los registros limpios como entrada para las consultas de Delta Live Tables que crean conjuntos de datos derivados.

Este código muestra un ejemplo simplificado de la arquitectura medallion. Consulte ¿Qué es la arquitectura de la casa del lago medallion?.

Las implementaciones de este ejemplo se proporcionan para Python y SQL. Siga los pasos para crear una canalización y un cuaderno y, a continuación, copie y pegue el código proporcionado.

También se proporcionan cuadernos de ejemplo con código completo.

Requisitos

  • Para iniciar una canalización, debe tener permiso de creación de clústeres o acceso a una directiva de clúster que defina un clúster de Delta Live Tables. El entorno de ejecución de Delta Live Tables crea un clúster antes de que ejecute la canalización y se produce un error si no tiene el permiso correcto.

  • Todos los usuarios pueden desencadenar actualizaciones mediante canalizaciones sin servidor de forma predeterminada. Sin servidor debe estar habilitado en el nivel de cuenta y es posible que no esté disponible en la región del área de trabajo. Consulte Habilitación del proceso sin servidor.

  • En los ejemplos de este tutorial se usa el catálogo de Unity. Databricks recomienda crear un nuevo esquema para ejecutar este tutorial, ya que se crean varios objetos de base de datos en el esquema de destino.

    • Para crear un nuevo esquema en un catálogo, debe tener ALL PRIVILEGES o USE CATALOG y CREATE SCHEMA privilegios.
    • Si no puede crear un nuevo esquema, ejecute este tutorial en un esquema existente. Debe tener los siguientes privilegios:
      • USE CATALOG para el catálogo primario.
      • ALL PRIVILEGES o USE SCHEMA, CREATE MATERIALIZED VIEWy CREATE TABLE privilegios en el esquema de destino.
    • En este tutorial se usa un volumen para almacenar datos de ejemplo. Databricks recomienda crear un nuevo volumen para este tutorial. Si crea un nuevo esquema para este tutorial, puede crear un nuevo volumen en ese esquema.
      • Para crear un nuevo volumen en un esquema existente, debe tener los siguientes privilegios:
        • USE CATALOG para el catálogo primario.
        • ALL PRIVILEGES o USE SCHEMA y CREATE VOLUME privilegios en el esquema de destino.
      • Opcionalmente, puede usar un volumen existente. Debe tener los siguientes privilegios:
        • USE CATALOG para el catálogo primario.
        • USE SCHEMA para el esquema primario.
        • ALL PRIVILEGES o READ VOLUME y WRITE VOLUME en el volumen de destino.

    Para establecer estos permisos, póngase en contacto con el administrador de Databricks. Para obtener más información sobre los privilegios del catálogo de Unity, consulte Privilegios del catálogo de Unity y objetos protegibles.

Paso 0: Descargar datos

En este ejemplo se cargan datos de un volumen de Catálogo de Unity. El código siguiente descarga un archivo CSV y lo almacena en el volumen especificado. Abra un nuevo cuaderno y ejecute el código siguiente para descargar estos datos en el volumen especificado:

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

Reemplace <catalog-name>, <schema-name> y <volume-name> por los nombres de catálogo, esquema y volumen de un volumen de Unity Catalog. El código proporcionado intenta crear el esquema y el volumen especificados si estos objetos no existen. Debe tener los privilegios adecuados para crear y escribir en objetos en el catálogo de Unity. Vea Requisitos.

Nota:

Asegúrese de que este cuaderno se ha ejecutado correctamente antes de continuar con el tutorial. No configure este cuaderno como parte de la canalización.

Paso 1: Creación de una canalización

Delta Live Tables crea canalizaciones mediante la resolución de dependencias definidas en cuadernos o archivos (denominados código fuente) mediante la sintaxis de Delta Live Tables. Cada archivo de código fuente solo puede contener un idioma, pero puede agregar varios cuadernos o archivos específicos del lenguaje en la canalización.

Importante

No configure ningún recurso en el campo Código fuente. Dejar este campo negro crea y configura un cuaderno para la creación de código fuente.

Las instrucciones de este tutorial usan el proceso sin servidor y el catálogo de Unity. Use la configuración predeterminada para todas las opciones de configuración que no se mencionan en estas instrucciones.

Nota:

Si no está habilitado o compatible con el servidor en el área de trabajo, puede completar el tutorial como escrito mediante la configuración de proceso predeterminada. Debe seleccionar manualmente el catálogo de Unity en Opciones de almacenamiento en la sección Destino de la interfaz de usuario crear canalización .

Para configurar una nueva canalización, haga lo siguiente:

  1. Haga clic en Delta Live Tables (Tablas dinámicas delta) en la barra lateral.
  2. Haga clic en Crear canalización.
  3. Proporcione un nombre de canalización único.
  4. Active la casilla situada junto a Sin servidor.
  5. Seleccione un catálogo para publicar datos.
  6. Seleccione un esquema en el catálogo.
    • Especifique un nuevo nombre de esquema para crear un esquema.
  7. Defina tres parámetros de canalización mediante el botón Agregar configuración en Opciones avanzadas para agregar tres configuraciones. Especifique el catálogo, el esquema y el volumen en los que descargó los datos mediante los siguientes nombres de parámetro:
    • my_catalog
    • my_schema
    • my_volume
  8. Haga clic en Crear.

La interfaz de usuario de canalizaciones aparece para la canalización recién creada. Un cuaderno de código fuente se crea y configura automáticamente para la canalización.

El cuaderno se crea en un nuevo directorio del directorio de usuario. El nombre del nuevo directorio y archivo coinciden con el nombre de la canalización. Por ejemplo, /Users/your.username@databricks.com/my_pipeline/my_pipeline.

Un vínculo para acceder a este cuaderno se encuentra en el campo Código fuente del panel Detalles de canalización. Haga clic en el vínculo para abrir el cuaderno antes de continuar con el paso siguiente.

Paso 2: Declarar vistas materializadas y tablas de streaming en un cuaderno con Python o SQL

Puede usar cuadernos de Datbricks para desarrollar y validar de forma interactiva el código fuente de las canalizaciones de Delta Live Tables. Debe adjuntar el cuaderno a la canalización para usar esta funcionalidad. Para adjuntar el cuaderno recién creado a la canalización que acaba de crear:

  1. Haga clic en Conectar en la esquina superior derecha para abrir el menú de configuración de proceso.
  2. Mantenga el puntero sobre el nombre de la canalización que creó en el paso 1.
  3. Haga clic en Conectar.

La interfaz de usuario cambia para incluir los botones Validar e Iniciar en la esquina superior derecha. Para más información sobre la compatibilidad de cuadernos con el desarrollo de código de canalización, consulte Desarrollo y depuración de canalizaciones de Delta Live Tables en cuadernos.

Importante

  • Las canalizaciones de Delta Live Tables evalúan todas las celdas de un cuaderno durante la planeación. A diferencia de los cuadernos que se ejecutan en procesos de uso completo o programados como trabajos, las canalizaciones no garantizan que las celdas se ejecuten en el orden especificado.
  • Los cuadernos solo pueden contener un solo lenguaje de programación. No combine código de Python y SQL en cuadernos de código fuente de canalización.

Para más información sobre el desarrollo de código con Python o SQL, consulte Desarrollo de código de canalización con Python o Desarrollo de código de canalización con SQL.

Código de canalización de ejemplo

Para implementar el ejemplo de este tutorial, copie y pegue el código siguiente en una celda del cuaderno configurado como código fuente para la canalización.

El código proporcionado hace lo siguiente:

  • Importa los módulos necesarios (solo Python).
  • Hace referencia a parámetros definidos durante la configuración de la canalización.
  • Define una tabla de streaming denominada baby_names_raw que ingiere desde un volumen.
  • Define una vista materializada denominada baby_names_prepared que valida los datos ingeridos.
  • Define una vista materializada denominada top_baby_names_2021 que tiene una vista muy refinada de los datos.

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM LIVE.baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Paso 3: Iniciar una actualización de canalización

Para iniciar una actualización de canalización, haga clic en el botón Iniciar de la parte superior derecha de la interfaz de usuario del cuaderno.

Cuadernos de ejemplo

Los cuadernos siguientes contienen los mismos ejemplos de código proporcionados en este artículo. Estos cuadernos tienen los mismos requisitos que los pasos descritos en este artículo. Vea Requisitos.

Para importar un cuaderno, complete los pasos siguientes:

  1. Abra la interfaz de usuario del cuaderno.
    • Haga clic en + Nuevo>cuaderno.
    • Se abre un cuaderno vacío.
  2. Haga clic en Archivo>Importar. Aparece el cuadro de diálogo Importar.
  3. Seleccione la opción URL para Importar desde.
  4. Pegue la dirección URL del cuaderno.
  5. Haga clic en Import (Importar).

Este tutorial requiere que ejecute un cuaderno de configuración de datos antes de configurar y ejecutar la canalización de Delta Live Tables. Importe el cuaderno siguiente, adjunte el cuaderno a un recurso de proceso, rellene la variable necesaria para my_catalog, y , y my_volumehaga clic en Ejecutar todomy_schema.

Tutorial de descarga de datos para canalizaciones

Obtener el cuaderno

Los cuadernos siguientes proporcionan ejemplos en Python o SQL. Al importar un cuaderno, se guarda en el directorio principal del usuario.

Después de importar uno de los cuadernos siguientes, complete los pasos para crear una canalización, pero use el selector de archivos de código fuente para seleccionar el cuaderno descargado. Después de crear la canalización con un cuaderno configurado como código fuente, haga clic en Iniciar en la interfaz de usuario de canalización para desencadenar una actualización.

Introducción al cuaderno de Python de Delta Live Tables

Obtener el cuaderno

Introducción al cuaderno SQL de Python de Delta Live Tables

Obtener el cuaderno