Självstudie: Kör din första DLT-pipeline
I den här självstudien går vi igenom stegen för att konfigurera din första DLT-pipeline, skriva grundläggande ETL-kod och köra en pipelineuppdatering.
Alla steg i den här handledningen är utformade för arbetsytor med Unity Catalog aktiverat. Du kan också konfigurera DLT-pipelines så att de fungerar med det äldre Hive-metaarkivet. Se Använd DLT-pipelines med äldre Hive-metaarkiv.
Not
Den här studien innehåller instruktioner för utveckling och validering av ny pipelinekod med Databricks notebooks. Du kan också konfigurera pipelines med källkod i Python- eller SQL-filer.
Du kan konfigurera en pipeline för att köra koden om du redan har källkoden skriven med DLT-syntax. Se Konfigurera en DLT-pipeline.
Du kan använda fullständigt deklarativ SQL-syntax i Databricks SQL för att registrera och ange uppdateringsscheman för materialiserade vyer och strömmande tabeller som Unity Catalog-hanterade objekt. Se Använda materialiserade vyer i Databricks SQL och Läsa in data med hjälp av strömmande tabeller i Databricks SQL.
Exempel: Mata in och bearbeta data om babynamn i New York
Exemplet i den här artikeln använder en offentligt tillgänglig datauppsättning som innehåller poster för New York State-babynamn. Det här exemplet visar hur du använder en DLT-pipeline för att:
- Läs rå CSV-data från en volym till en tabell.
- Läs posterna från inmatningstabellen och använd DLT förväntningar för att skapa en ny tabell som innehåller rensade data.
- Använd de rensade posterna som indata till DLT-frågor som skapar härledda datauppsättningar.
Den här koden visar ett förenklat exempel på medaljongarkitekturen. Se Vad är medallion lakehouse-arkitekturen?.
Implementeringar av det här exemplet tillhandahålls för Python och SQL. Följ stegen för att skapa en ny pipeline och notebook-fil och kopiera och klistra sedan in den angivna koden.
Exempel anteckningsböcker med fullständig kod tillhandahålls också.
Krav
- Om du vill starta en pipeline måste du ha behörighet att skapa kluster eller åtkomst till en klusterprincip som definierar ett DLT-kluster. DLT-körningen skapar ett kluster innan den kör din pipeline och misslyckas om du inte har rätt behörighet.
- Alla användare kan utlösa uppdateringar med hjälp av serverlösa pipelines som standard. Serverlös måste vara aktiverad på kontonivå och kanske inte är tillgänglig i din arbetsyteregion. Se avsnitt Aktivera serverlös beräkning.
Exemplen i den här självstudien använder Unity Catalog. Databricks rekommenderar att du skapar ett nytt schema för att köra den här självstudien, eftersom flera databasobjekt skapas i målschemat.
- Om du vill skapa ett nytt schema i en katalog måste du ha
ALL PRIVILEGES
ellerUSE CATALOG
ochCREATE SCHEMA
behörigheter. - Om du inte kan skapa ett nytt schema bör du köra den här självstudien mot ett befintligt schema. Du måste ha följande behörigheter:
-
USE CATALOG
för den överordnade katalogen. -
ALL PRIVILEGES
ellerUSE SCHEMA
,CREATE MATERIALIZED VIEW
ochCREATE TABLE
behörigheter i målschemat.
-
- I den här självstudien används en volym för att lagra exempeldata. Databricks rekommenderar att du skapar en ny volym för den här handledningen. Om du skapar ett nytt schema för den här självstudien kan du skapa en ny volym i schemat.
- Om du vill skapa en ny volym i ett befintligt schema måste du ha följande behörigheter:
-
USE CATALOG
för den överordnade katalogen. -
ALL PRIVILEGES
ellerUSE SCHEMA
ochCREATE VOLUME
behörigheter i målschemat.
-
- Du kan valfritt använda en befintlig volym. Du måste ha följande behörigheter:
-
USE CATALOG
för den överordnade katalogen. -
USE SCHEMA
för huvudschemat. -
ALL PRIVILEGES
ellerREAD VOLUME
ochWRITE VOLUME
på målvolymen.
-
- Om du vill skapa en ny volym i ett befintligt schema måste du ha följande behörigheter:
Om du vill ange dessa behörigheter kontaktar du Databricks-administratören. Mer information om Behörigheter för Unity-katalog finns i Behörigheter för Unity-katalogen och skyddsbara objekt.
- Om du vill skapa ett nytt schema i en katalog måste du ha
steg 0: Ladda ned data
Det här exemplet läser in data från en Unity Catalog-volym. Följande kod laddar ned en CSV-fil och lagrar den på den angivna volymen. Öppna en ny notebook-fil och kör följande kod för att ladda ned dessa data till den angivna volymen:
import urllib
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
urllib.request.urlretrieve(download_url, volume_path + filename)
Ersätt <catalog-name>
, <schema-name>
och <volume-name>
med katalog-, schema- och volymnamnen för en Unity Catalog-volym. Den angivna koden försöker skapa det angivna schemat och volymen om dessa objekt inte finns. Du måste ha rätt behörighet att skapa och skriva till objekt i Unity Catalog. Se Krav.
Anteckning
Kontrollera att anteckningsboken har körts framgångsrikt innan du fortsätter med handledningen. Konfigurera inte den här notebooken som en del av din pipeline.
steg 1: Skapa en pipeline
DLT skapar pipelines genom att lösa beroenden som definierats i notebooks eller filer (kallas källkod) med hjälp av DLT-syntaxen. Varje källkodsfil kan bara innehålla ett språk, men du kan lägga till flera språkspecifika notebook-filer eller filer i pipelinen.
Viktig
Konfigurera inte några tillgångar i fältet Källkod. Om du lämnar fältet svart skapas och konfigureras en notebook-fil för redigering av källkod.
Anvisningarna i den här självstudien använder serverlös beräkning och Unity Catalog. Använd standardinställningarna för alla konfigurationsalternativ som inte anges i de här anvisningarna.
Not
Om serverless inte är aktiverat eller stöds i din arbetsyta kan du genomföra självstudien enligt de förvalda beräkningsinställningarna. Du måste välja Unity Catalog manuellt under Lagringsalternativ i avsnittet Mål i användargränssnittet Skapa pipeline.
Gör följande för att konfigurera en ny pipeline:
- I sidofältet klickar du på DLT-.
- Klicka på Skapa pipelina.
- I Pipeline name, ange ett unikt pipelinenamn.
- Markera kryssrutan Serverlös.
- I Målväljer du en katalog och ett schemaför att konfigurera en Unity-katalogplats där tabeller publiceras.
- I Advancedklickar du på Lägg till konfiguration och definierar sedan pipelineparametrar för katalogen, schemat och volymen som du laddade ned data till med hjälp av följande parameternamn:
my_catalog
my_schema
my_volume
- Klicka på Skapa.
Användargränssnittet för pipelines visas för den nya pipelinen. En källkodsanteckningsbok skapas och konfigureras automatiskt för pipelinen.
Anteckningsboken skapas i en ny katalog i användarkatalogen. Namnet på den nya katalogen och filen matchar namnet på din pipeline. Till exempel /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
En länk för att komma åt den här notebook-filen finns under fältet Källkod i Pipeline-information panelen. Klicka på länken för att öppna anteckningsboken innan du fortsätter till nästa steg.
Steg 2: Deklarera materialiserade vyer och strömmande tabeller i en notebook-fil med Python eller SQL
Du kan använda Datbricks-notebook-filer för att interaktivt utveckla och validera källkoden för DLT-pipelines. Du måste koppla anteckningsboken till pipelinen för att kunna använda den här funktionen. Så här kopplar du den nyligen skapade notebook-filen till pipelinen som du nyss skapade:
- Klicka på Anslut i det övre högra hörnet för att öppna konfigurationsmenyn för beräkning.
- Hovra över namnet på pipelinen som du skapade i steg 1.
- Klicka på Anslut.
Användargränssnittet ändras till att omfatta knapparna Validate och Start i det övre högra hörnet. Mer information om notebook-stöd för utveckling av pipelinekod finns i Utveckla och felsöka DLT-pipelines i notebook-filer.
Viktig
- DLT-pipelines utvärderar alla celler i en notebook under planeringsstadiet. Till skillnad från notebook-filer som körs mot all-purpose compute eller schemalagda som jobb, garanterar pipelines inte att celler körs i den angivna ordningen.
- Notebook-filer kan bara innehålla ett enda programmeringsspråk. Blanda inte Python- och SQL-kod i pipelinens källkodsanteckningsböcker.
Mer information om hur du utvecklar kod med Python eller SQL finns i Utveckla pipelinekod med Python eller Utveckla pipelinekod med SQL.
Exempel på pipelinekod
Om du vill implementera exemplet i den här självstudien kopierar du och klistrar in följande kod i en cell i anteckningsboken som konfigurerats som källkod för din pipeline.
Den angivna koden gör följande:
- Importerar nödvändiga moduler (endast Python).
- Refererar till parametrar som definierats under pipelinekonfigurationen.
- Definierar en strömmande tabell vid namnet
baby_names_raw
som mottar data från en volym. - Definierar en materialiserad vy med namnet
baby_names_prepared
som validerar inmatade data. - Definierar en materialiserad vy med namnet
top_baby_names_2021
som har en mycket förfinad vy över data.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
steg 3: Starta en pipelineuppdatering
Starta en pipelineuppdatering genom att klicka på knappen Starta längst upp till höger i notebook-användargränssnittet.
Exempelanteckningsböcker
Följande notebook-filer innehåller samma kodexempel som anges i den här artikeln. Dessa anteckningsböcker har samma krav som instruktionerna i den här artikeln. Se Krav.
Utför följande steg för att importera en notebook:
- Öppna notebook-användargränssnittet.
- Klicka på + Ny>anteckningsbok.
- En tom anteckningsbok öppnas.
- Klicka på Fil>Import.... Dialogrutan Import visas.
- Välj alternativet URL för Importera från.
- Klistra in URL:en för anteckningsboken.
- Klicka på Importera.
Den här handledningen kräver att du kör en notebook-fil för datainstallation innan du konfigurerar och kör din DLT-pipeline. Importera följande anteckningsbok, koppla den till en beräkningsresurs, fyll i den obligatoriska variabeln för my_catalog
, my_schema
och my_volume
och klicka på Kör alla.
Självstudie om datanedladdning för pipelines
Följande notebook-filer innehåller exempel i Python eller SQL. När du importerar en notebook-fil sparas den i användarens hemkatalog.
När du har importerat någon av notebooks nedan, slutför du stegen för att skapa en pipeline, men använder filväljaren i Källkod för att välja den nedladdade notebook-filen. När du har skapat pipelinen med en notebook-fil som konfigurerats som källkod klickar du på Starta i pipelinegränssnittet för att utlösa en uppdatering.