Självstudie: Kör din första Delta Live Tables-pipeline
I den här självstudien går vi igenom stegen för att konfigurera din första Delta Live Tables-pipeline, skriva grundläggande ETL-kod och köra en pipelineuppdatering.
Alla steg i den här självstudien är utformade för arbetsytor med Unity Catalog aktiverat. Du kan också konfigurera Delta Live Tables-pipelines så att de fungerar med det äldre Hive-metaarkivet. Se Använda Delta Live Tables-pipelines med äldre Hive-metaarkiv.
Kommentar
Den här självstudien innehåller instruktioner för att utveckla och validera ny pipelinekod med databricks notebook-filer. Du kan också konfigurera pipelines med källkod i Python- eller SQL-filer.
Du kan konfigurera en pipeline för att köra koden om du redan har källkoden skriven med hjälp av Delta Live Tables-syntax. Se Konfigurera en Delta Live Tables-pipeline.
Du kan använda fullständigt deklarativ SQL-syntax i Databricks SQL för att registrera och ange uppdateringsscheman för materialiserade vyer och strömmande tabeller som Unity Catalog-hanterade objekt. Se Använda materialiserade vyer i Databricks SQL och Läs in data med hjälp av strömmande tabeller i Databricks SQL.
Exempel: Mata in och bearbeta data om babynamn i New York
Exemplet i den här artikeln använder en offentligt tillgänglig datauppsättning som innehåller poster med babynamn i New York State. Det här exemplet visar hur du använder en Delta Live Tables-pipeline för att:
- Läsa rådata från en volym till en tabell.
- Läs posterna från inmatningstabellen och använd Delta Live Tables-förväntningar för att skapa en ny tabell som innehåller rensade data.
- Använd de rensade posterna som indata till Delta Live Tables-frågor som skapar härledda datauppsättningar.
Den här koden visar ett förenklat exempel på medaljongarkitekturen. Se Vad är medallion lakehouse arkitektur?.
Implementeringar av det här exemplet tillhandahålls för Python och SQL. Följ stegen för att skapa en ny pipeline och notebook-fil och kopiera och klistra sedan in den angivna koden.
Exempel på notebook-filer med fullständig kod finns också.
Krav
Om du vill starta en pipeline måste du ha behörighet att skapa kluster eller komma åt en klusterprincip som definierar ett Delta Live Tables-kluster. Delta Live Tables-körningen skapar ett kluster innan din pipeline körs och misslyckas om du inte har rätt behörighet.
Alla användare kan utlösa uppdateringar med hjälp av serverlösa pipelines som standard. Serverlös måste vara aktiverad på kontonivå och kanske inte är tillgänglig i din arbetsyteregion. Se Aktivera serverlös beräkning.
Exemplen i den här självstudien använder Unity Catalog. Databricks rekommenderar att du skapar ett nytt schema för att köra den här självstudien, eftersom flera databasobjekt skapas i målschemat.
- Om du vill skapa ett nytt schema i en katalog måste du ha
ALL PRIVILEGES
ellerUSE CATALOG
ochCREATE SCHEMA
behörigheter. - Om du inte kan skapa ett nytt schema kör du den här självstudien mot ett befintligt schema. Du måste ha följande behörigheter:
USE CATALOG
för den överordnade katalogen.ALL PRIVILEGES
ellerUSE SCHEMA
,CREATE MATERIALIZED VIEW
ochCREATE TABLE
behörigheter för målschemat.
- I den här självstudien används en volym för att lagra exempeldata. Databricks rekommenderar att du skapar en ny volym för den här självstudien. Om du skapar ett nytt schema för den här självstudien kan du skapa en ny volym i schemat.
- Om du vill skapa en ny volym i ett befintligt schema måste du ha följande behörigheter:
USE CATALOG
för den överordnade katalogen.ALL PRIVILEGES
ellerUSE SCHEMA
ochCREATE VOLUME
behörigheter i målschemat.
- Du kan också använda en befintlig volym. Du måste ha följande behörigheter:
USE CATALOG
för den överordnade katalogen.USE SCHEMA
för det överordnade schemat.ALL PRIVILEGES
ellerREAD VOLUME
påWRITE VOLUME
målvolymen.
- Om du vill skapa en ny volym i ett befintligt schema måste du ha följande behörigheter:
Om du vill ange dessa behörigheter kontaktar du Databricks-administratören. Mer information om Behörigheter för Unity-katalog finns i Behörigheter för Unity-katalog och skyddsbara objekt.
- Om du vill skapa ett nytt schema i en katalog måste du ha
Steg 0: Ladda ned data
Det här exemplet läser in data från en Unity Catalog-volym. Följande kod laddar ned en CSV-fil och lagrar den på den angivna volymen. Öppna en ny notebook-fil och kör följande kod för att ladda ned dessa data till den angivna volymen:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
Ersätt <catalog-name>
, <schema-name>
och <volume-name>
med katalog-, schema- och volymnamnen för en Unity Catalog-volym. Den angivna koden försöker skapa det angivna schemat och volymen om dessa objekt inte finns. Du måste ha rätt behörighet att skapa och skriva till objekt i Unity Catalog. Se Krav.
Kommentar
Kontrollera att anteckningsboken har körts innan du fortsätter med självstudien. Konfigurera inte den här notebook-filen som en del av din pipeline.
Steg 1: Skapa en pipeline
Delta Live Tables skapar pipelines genom att matcha beroenden som definierats i notebook-filer (kallas källkod) med hjälp av Delta Live Tables-syntax. Varje källkodsfil kan bara innehålla ett språk, men du kan lägga till flera språkspecifika notebook-filer eller filer i pipelinen.
Viktigt!
Konfigurera inte några tillgångar i fältet Källkod . Om du lämnar fältet svart skapas och konfigureras en notebook-fil för redigering av källkod.
Anvisningarna i den här självstudien använder serverlös beräkning och Unity Catalog. Använd standardinställningarna för alla konfigurationsalternativ som inte nämns i de här anvisningarna.
Kommentar
Om serverlös inte är aktiverad eller stöds på din arbetsyta kan du slutföra självstudien enligt standardinställningarna för beräkning. Du måste välja Unity Catalog manuellt under Lagringsalternativ i avsnittet Mål i användargränssnittet för skapa pipeline.
Gör följande för att konfigurera en ny pipeline:
- Klicka på Delta Live Tables (Delta Live Tables ) i sidofältet.
- Klicka på Skapa pipeline.
- Ange ett unikt pipelinenamn.
- Markera kryssrutan bredvid Serverlös.
- Välj en katalog för att publicera data.
- Välj ett schema i katalogen.
- Ange ett nytt schemanamn för att skapa ett schema.
- Definiera tre pipelineparametrar med knappen Lägg till konfiguration under Avancerat för att lägga till tre konfigurationer. Ange den katalog, det schema och den volym som du laddade ned data till med hjälp av följande parameternamn:
my_catalog
my_schema
my_volume
- Klicka på Skapa.
Användargränssnittet för pipelines visas för den nyligen skapade pipelinen. En källkodsanteckningsbok skapas och konfigureras automatiskt för pipelinen.
Anteckningsboken skapas i en ny katalog i användarkatalogen. Namnet på den nya katalogen och filen matchar namnet på din pipeline. Exempel: /Users/your.username@databricks.com/my_pipeline/my_pipeline
En länk för att komma åt den här notebook-filen finns under fältet Källkod i panelen Pipelineinformation . Klicka på länken för att öppna anteckningsboken innan du fortsätter till nästa steg.
Steg 2: Deklarera materialiserade vyer och strömmande tabeller i en notebook-fil med Python eller SQL
Du kan använda Datbricks-notebook-filer för att interaktivt utveckla och verifiera källkoden för Delta Live Tables-pipelines. Du måste koppla anteckningsboken till pipelinen för att kunna använda den här funktionen. Så här kopplar du den nyligen skapade notebook-filen till pipelinen som du nyss skapade:
- Klicka på Anslut i det övre högra hörnet för att öppna konfigurationsmenyn för beräkning.
- Hovra över namnet på pipelinen som du skapade i steg 1.
- Klicka på Anslut.
Användargränssnittet ändras till att inkludera knapparna Verifiera och Starta i det övre högra hörnet. Mer information om notebook-stöd för utveckling av pipelinekod finns i Utveckla och felsöka Delta Live Tables-pipelines i notebook-filer.
Viktigt!
- Delta Live Tables-pipelines utvärderar alla celler i en notebook-fil under planeringen. Till skillnad från notebook-filer som körs mot all-purpose compute eller schemalagda som jobb, garanterar pipelines inte att celler körs i den angivna ordningen.
- Notebook-filer kan bara innehålla ett enda programmeringsspråk. Blanda inte Python- och SQL-kod i pipelinens källkodsanteckningsböcker.
Mer information om hur du utvecklar kod med Python eller SQL finns i Utveckla pipelinekod med Python eller Utveckla pipelinekod med SQL.
Exempel på pipelinekod
Om du vill implementera exemplet i den här självstudien kopierar du och klistrar in följande kod i en cell i anteckningsboken som konfigurerats som källkod för din pipeline.
Den angivna koden gör följande:
- Importerar nödvändiga moduler (endast Python).
- Refererar till parametrar som definierats under pipelinekonfigurationen.
- Definierar en strömmande tabell med namnet
baby_names_raw
som matar in från en volym. - Definierar en materialiserad vy med namnet
baby_names_prepared
som validerar inmatade data. - Definierar en materialiserad vy med namnet
top_baby_names_2021
som har en mycket förfinad vy över data.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
Steg 3: Starta en pipelineuppdatering
Starta en pipelineuppdatering genom att klicka på knappen Start längst upp till höger i notebook-användargränssnittet.
Exempel på notebook-filer
Följande notebook-filer innehåller samma kodexempel som anges i den här artikeln. Dessa notebook-filer har samma krav som stegen i den här artikeln. Se Krav.
Utför följande steg för att importera en notebook-fil:
- Öppna notebook-användargränssnittet.
- Klicka på + Ny>anteckningsbok.
- En tom anteckningsbok öppnas.
- Klicka på Arkiv>Importera. Dialogrutan Importera visas.
- Välj URL-alternativet för Importera från.
- Klistra in URL:en för notebook-filen.
- Klicka på Importera.
Den här självstudien kräver att du kör en notebook-fil för datainstallation innan du konfigurerar och kör din Delta Live Tables-pipeline. Importera följande notebook-fil, koppla anteckningsboken till en beräkningsresurs, fyll i variabeln som krävs för my_catalog
, my_schema
och my_volume
och klicka på Kör alla.
Självstudie om datanedladdning för pipelines
Följande notebook-filer innehåller exempel i Python eller SQL. När du importerar en notebook-fil sparas den i användarens hemkatalog.
När du har importerat en av anteckningsböckerna nedan slutför du stegen för att skapa en pipeline, men använder filväljaren källkod för att välja den nedladdade anteckningsboken. När du har skapat pipelinen med en notebook-fil som konfigurerats som källkod klickar du på Starta i pipelinegränssnittet för att utlösa en uppdatering.