Condividi tramite


Esercitazione: Eseguire la prima pipeline di tabelle live Delta

Questa esercitazione illustra la procedura per configurare la prima pipeline di tabelle live Delta, scrivere codice ETL di base ed eseguire un aggiornamento della pipeline.

Tutti i passaggi di questa esercitazione sono progettati per le aree di lavoro con Il catalogo unity abilitato. È anche possibile configurare pipeline di tabelle live Delta per l'uso con il metastore Hive legacy. Vedere Use Delta Live Tables pipelines with legacy Hive metastore (Usare pipeline di tabelle live Delta con metastore Hive legacy).

Nota

Questa esercitazione contiene istruzioni per lo sviluppo e la convalida di un nuovo codice della pipeline usando i notebook di Databricks. È anche possibile configurare le pipeline usando il codice sorgente in file Python o SQL.

È possibile configurare una pipeline per eseguire il codice se il codice sorgente è già stato scritto usando la sintassi delle tabelle live Delta. Vedere Configurare una pipeline di tabelle live Delta.

È possibile usare la sintassi SQL completamente dichiarativa in Databricks SQL per registrare e impostare pianificazioni di aggiornamento per viste materializzate e tabelle di streaming come oggetti gestiti dal catalogo unity. Vedere Usare viste materializzate in Databricks SQL e Caricare dati usando tabelle di streaming in Databricks SQL.

Esempio: inserire ed elaborare i dati dei nomi dei bambini di New York

L'esempio in questo articolo usa un set di dati disponibile pubblicamente che contiene i record dei nomi dei bambini dello Stato di New York. Questo esempio illustra l'uso di una pipeline di tabelle live Delta per:

  • Leggere dati CSV non elaborati da un volume in una tabella.
  • Leggere i record dalla tabella di inserimento e usare le aspettative delle tabelle live Delta per creare una nuova tabella contenente dati puliti.
  • Usare i record puliti come input per le query Delta Live Tables che creano set di dati derivati.

Questo codice illustra un esempio semplificato dell'architettura medallion. Vedere Che cos'è l'architettura lakehouse medallion?.

Le implementazioni di questo esempio vengono fornite per Python e SQL. Seguire la procedura per creare una nuova pipeline e un nuovo notebook, quindi copiare e incollare il codice fornito.

Sono inoltre disponibili notebook di esempio con codice completo.

Requisiti

  • Per avviare una pipeline, è necessario avere l'autorizzazione di creazione del cluster o l'accesso a un criterio del cluster che definisce un cluster Delta Live Tables. Il runtime di DLT crea un cluster prima di eseguire la pipeline e ha esito negativo se non si ha l'autorizzazione corretta.

  • Per impostazione predefinita, tutti gli utenti possono attivare gli aggiornamenti usando pipeline serverless. Il serverless deve essere abilitato a livello di account e potrebbe non essere disponibile nell'area dell'area di lavoro. Consultare la sezione Abilitare l’elaborazione serverless.

  • Gli esempi in questa esercitazione usano Unity Catalog. Databricks consiglia di creare un nuovo schema per eseguire questa esercitazione, perché nello schema di destinazione vengono creati più oggetti di database.

    • Per creare un nuovo schema in un catalogo, è necessario disporre ALL PRIVILEGES di privilegi o o USE CATALOG CREATE SCHEMA .
    • Se non è possibile creare un nuovo schema, eseguire questa esercitazione su uno schema esistente. È necessario disporre dei privilegi seguenti:
      • USE CATALOG per il catalogo padre.
      • ALL PRIVILEGES o USE SCHEMA, CREATE MATERIALIZED VIEWe CREATE TABLE privilegi per lo schema di destinazione.
    • Questa esercitazione usa un volume per archiviare i dati di esempio. Databricks consiglia di creare un nuovo volume per questa esercitazione. Se si crea un nuovo schema per questa esercitazione, è possibile creare un nuovo volume in tale schema.
      • Per creare un nuovo volume in uno schema esistente, è necessario disporre dei privilegi seguenti:
        • USE CATALOG per il catalogo padre.
        • ALL PRIVILEGES o USE SCHEMA e CREATE VOLUME privilegi per lo schema di destinazione.
      • Facoltativamente, è possibile usare un volume esistente. È necessario disporre dei privilegi seguenti:
        • USE CATALOG per il catalogo padre.
        • USE SCHEMA per lo schema padre.
        • ALL PRIVILEGES oppure READ VOLUME e WRITE VOLUME nel volume di destinazione.

    Per impostare queste autorizzazioni, contattare l'amministratore di Databricks. Per altre informazioni sui privilegi del catalogo Unity, vedere Privilegi del catalogo Unity e oggetti a protezione diretta.

Passaggio 0: Scaricare i dati

Questo esempio carica i dati da un volume di Unity Catalog. Il codice seguente scarica un file CSV e lo archivia nel volume specificato. Aprire un nuovo notebook ed eseguire il codice seguente per scaricare questi dati nel volume specificato:

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

Sostituire <catalog-name>, <schema-name> e <volume-name> con i nomi del catalogo, dello schema e del volume per un volume di Unity Catalog. Il codice fornito tenta di creare lo schema e il volume specificati se questi oggetti non esistono. È necessario disporre dei privilegi appropriati per creare e scrivere in oggetti in Unity Catalog. Vedere Requisiti.

Nota

Assicurarsi che il notebook sia stato eseguito correttamente prima di continuare con l'esercitazione. Non configurare questo notebook come parte della pipeline.

Passaggio 1: Creare una pipeline

Le tabelle live delta creano pipeline risolvendo le dipendenze definite nei notebook o nei file (denominati codice sorgente) usando la sintassi Delta Live Tables. Ogni file di codice sorgente può contenere una sola lingua, ma è possibile aggiungere più notebook o file specifici della lingua nella pipeline.

Importante

Non configurare alcun asset nel campo Codice sorgente. Lasciando questo campo nero viene creato e configurato un notebook per la creazione del codice sorgente.

Le istruzioni in questa esercitazione usano il calcolo serverless e il catalogo Unity. Usare le impostazioni predefinite per tutte le opzioni di configurazione non indicate in queste istruzioni.

Nota

Se serverless non è abilitato o supportato nell'area di lavoro, è possibile completare l'esercitazione come scritto usando le impostazioni di calcolo predefinite. È necessario selezionare manualmente Il catalogo Unity in Opzioni di archiviazione nella sezione Destinazione dell'interfaccia utente crea pipeline .

Per configurare una nuova pipeline, eseguire le operazioni seguenti:

  1. Fare clic su Delta Live Tables (Tabelle attive Delta) nella barra laterale.
  2. Fare clic su Crea pipeline.
  3. Specificare un nome di pipeline univoco.
  4. Selezionare la casella accanto a Serverless.
  5. Selezionare un catalogo per pubblicare i dati.
  6. Selezionare uno schema nel catalogo.
    • Specificare un nuovo nome di schema per creare uno schema.
  7. Definire tre parametri della pipeline usando il pulsante Aggiungi configurazione in Avanzate per aggiungere tre configurazioni. Specificare il catalogo, lo schema e il volume in cui sono stati scaricati i dati usando i nomi dei parametri seguenti:
    • my_catalog
    • my_schema
    • my_volume
  8. Cliccare su Crea.

L'interfaccia utente delle pipeline viene visualizzata per la pipeline appena creata. Un notebook del codice sorgente viene creato e configurato automaticamente per la pipeline.

Il notebook viene creato in una nuova directory nella directory utente. Il nome della nuova directory e del file corrisponde al nome della pipeline. Ad esempio: /Users/your.username@databricks.com/my_pipeline/my_pipeline.

Un collegamento per accedere a questo notebook si trova nel campo Codice sorgente nel pannello Dettagli pipeline. Fare clic sul collegamento per aprire il notebook prima di procedere al passaggio successivo.

Passaggio 2: Dichiarare viste materializzate e tabelle di streaming in un notebook con Python o SQL

È possibile usare i notebook di Datbricks per sviluppare e convalidare in modo interattivo il codice sorgente per le pipeline di tabelle live Delta. Per usare questa funzionalità, è necessario collegare il notebook alla pipeline. Per collegare il notebook appena creato alla pipeline appena creata:

  1. Fare clic su Connetti in alto a destra per aprire il menu di configurazione delle risorse di calcolo.
  2. Passare il puntatore del mouse sul nome della pipeline creata nel passaggio 1.
  3. Fare clic su Connetti.

L'interfaccia utente cambia per includere i pulsanti Convalida e Avvio in alto a destra. Per altre informazioni sul supporto dei notebook per lo sviluppo di codice della pipeline, vedere Sviluppare ed eseguire il debug di pipeline di tabelle live Delta nei notebook.

Importante

  • Le pipeline di tabelle live delta valutano tutte le celle di un notebook durante la pianificazione. A differenza dei notebook eseguiti su calcolo all-purpose o pianificati come processi, le pipeline non garantiscono che le celle vengano eseguite nell'ordine specificato.
  • I notebook possono contenere solo un singolo linguaggio di programmazione. Non combinare codice Python e SQL nei notebook del codice sorgente della pipeline.

Per informazioni dettagliate sullo sviluppo di codice con Python o SQL, vedere Sviluppare codice pipeline con Python o Sviluppare codice pipeline con SQL.

Codice della pipeline di esempio

Per implementare l'esempio in questa esercitazione, copiare e incollare il codice seguente in una cella del notebook configurato come codice sorgente per la pipeline.

Il codice fornito esegue le operazioni seguenti:

  • Importa i moduli necessari (solo Python).
  • Parametri di riferimento definiti durante la configurazione della pipeline.
  • Definisce una tabella di streaming denominata baby_names_raw che inserisce da un volume.
  • Definisce una vista materializzata denominata baby_names_prepared che convalida i dati inseriti.
  • Definisce una vista materializzata denominata top_baby_names_2021 con una vista estremamente raffinata dei dati.

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM LIVE.baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Passaggio 3: Avviare un aggiornamento della pipeline

Per avviare un aggiornamento della pipeline, fare clic sul pulsante Start in alto a destra nell'interfaccia utente del notebook.

Notebook di esempio

I notebook seguenti contengono gli stessi esempi di codice forniti in questo articolo. Questi notebook hanno gli stessi requisiti dei passaggi descritti in questo articolo. Vedere Requisiti.

Per importare un notebook, completare la procedura seguente:

  1. Aprire l'interfaccia utente del notebook.
    • Fare clic su + Nuovo>notebook.
    • Verrà aperto un notebook vuoto.
  2. Fare clic su File>Import (Importa). Verrà visualizzata la finestra di dialogo Importa.
  3. Selezionare l'opzione URL per Importa da.
  4. Incollare l'URL del notebook.
  5. Cliccare Importa.

Questa esercitazione richiede l'esecuzione di un notebook di configurazione dei dati prima di configurare ed eseguire la pipeline di tabelle live Delta. Importare il notebook seguente, collegare il notebook a una risorsa di calcolo, compilare la variabile necessaria per my_catalog, my_schemae my_volumee fare clic su Esegui tutto.

Esercitazione sul download dei dati per le pipeline

Ottenere il notebook

I notebook seguenti forniscono esempi in Python o SQL. Quando si importa un notebook, questo viene salvato nella home directory dell'utente.

Dopo aver importato uno dei notebook seguenti, completare i passaggi per creare una pipeline, ma usare la selezione file del codice sorgente per selezionare il notebook scaricato. Dopo aver creato la pipeline con un notebook configurato come codice sorgente, fare clic su Avvia nell'interfaccia utente della pipeline per attivare un aggiornamento.

Introduzione al Notebook Python per Delta Live Tables

Ottenere il notebook

Introduzione al Notebook SQL di Delta Live Tables

Ottenere il notebook