Delen via


Zelfstudie: Uw eerste DLT-pijplijn uitvoeren

In deze zelfstudie doorloopt u de stappen voor het configureren van uw eerste DLT-pijplijn, het schrijven van eenvoudige ETL-code en het uitvoeren van een pijplijnupdate.

Alle stappen in deze zelfstudie zijn ontworpen voor werkruimten waarvoor Unity Catalog is ingeschakeld. U kunt DLT-pijplijnen ook configureren voor gebruik met de verouderde Hive-metastore. Zie DLT-pijplijnen gebruiken met verouderde Hive-metastore.

Notitie

Deze zelfstudie bevat instructies voor het ontwikkelen en valideren van nieuwe pijplijncode met behulp van Databricks-notebooks. U kunt pijplijnen ook configureren met behulp van broncode in Python- of SQL-bestanden.

U kunt een pijplijn configureren om uw code uit te voeren als u al broncode hebt geschreven met behulp van DLT-syntaxis. Zie Een DLT-pijplijn configureren.

U kunt de volledig declaratieve SQL-syntaxis in Databricks SQL gebruiken om vernieuwingsschema's te registreren en in te stellen voor gerealiseerde weergaven en streamingtabellen als door Unity Catalog beheerde objecten. Zie Gerealiseerde weergaven gebruiken in Databricks SQL en Gegevens laden met behulp van streamingtabellen in Databricks SQL.

Voorbeeld: Gegevens van de babynamen van New York opnemen en verwerken

In het voorbeeld in dit artikel wordt een openbaar beschikbare gegevensset gebruikt die records van New York State babynamenbevat. In dit voorbeeld ziet u hoe u een DLT-pijplijn gebruikt om:

  • Onbewerkte CSV-gegevens van een volume lezen in een tabel.
  • Lees de records uit de opnametabel en gebruik DLT verwachtingen om een nieuwe tabel te maken die opgeschoonde gegevens bevat.
  • Gebruik de opgeschoonde records als invoer voor DLT-query's die afgeleide gegevenssets maken.

Deze code demonstreert een vereenvoudigd voorbeeld van de medallion-architectuur. Zie Wat is de medallion Lakehouse-architectuur?.

Implementaties van dit voorbeeld zijn beschikbaar voor Python en SQL. Volg de stappen om een nieuwe pijplijn en notebook te maken en kopieer en plak de opgegeven code.

Er worden ook voorbeelden van notebooks met volledige code verstrekt.

Eisen

  • Als u een pijplijn wilt starten, moet u beschikken over machtiging voor het maken van clusters of toegang tot een clusterbeleid dat een DLT-cluster definieert. De DLT-runtime maakt een cluster aan voordat het uw pijplijn uitvoert en geeft een foutmelding als u niet over de juiste toestemming beschikt.
  • Alle gebruikers kunnen standaard updates activeren met behulp van serverloze pijplijnen. Serverloos moet zijn ingeschakeld op accountniveau en is mogelijk niet beschikbaar in uw werkruimteregio. Zie Inschakelen van serverloze computerbronnen.
  • In de voorbeelden in deze zelfstudie wordt Unity Cataloggebruikt. Databricks raadt aan een nieuw schema te maken om deze zelfstudie uit te voeren, omdat er meerdere databaseobjecten worden gemaakt in het doelschema.

    • Als u een nieuw schema in een catalogus wilt maken, moet u ALL PRIVILEGES of USE CATALOG en CREATE SCHEMA bevoegdheden hebben.
    • Als u geen nieuw schema kunt maken, voert u deze zelfstudie uit op basis van een bestaand schema. U moet de volgende bevoegdheden hebben:
      • USE CATALOG voor de hoofdcatalogus.
      • ALL PRIVILEGES of USE SCHEMA, CREATE MATERIALIZED VIEWen CREATE TABLE bevoegdheden voor het doelschema.
    • In deze zelfstudie wordt een volume gebruikt om voorbeeldgegevens op te slaan. Databricks raadt aan een nieuw volume te maken voor deze zelfstudie. Als u een nieuw schema voor deze zelfstudie maakt, kunt u een nieuw volume in dat schema maken.
      • Als u een nieuw volume in een bestaand schema wilt maken, moet u de volgende bevoegdheden hebben:
        • USE CATALOG voor de bovenliggende catalogus.
        • ALL PRIVILEGES of USE SCHEMA en CREATE VOLUME bevoegdheden voor het doelschema.
      • U kunt eventueel een bestaand volume gebruiken. U moet de volgende bevoegdheden hebben:
        • USE CATALOG voor de hoofdcatalogus.
        • USE SCHEMA voor het bovenliggende schema.
        • ALL PRIVILEGES of READ VOLUME en WRITE VOLUME op het doelvolume.

    Neem contact op met uw Databricks-beheerder om deze machtigingen in te stellen. Zie Unity Catalog-bevoegdheden en beveiligbare objectenvoor meer informatie over Unity Catalog-bevoegdheden.

stap 0: Gegevens downloaden

In dit voorbeeld worden gegevens uit een Unity Catalog-volume geladen. Met de volgende code wordt een CSV-bestand gedownload en opgeslagen in het opgegeven volume. Open een nieuw notebook en voer de volgende code uit om deze gegevens te downloaden naar het opgegeven volume:

import urllib

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

urllib.request.urlretrieve(download_url, volume_path + filename)

Vervang <catalog-name>, <schema-name>en <volume-name> door de catalogus-, schema- en volumenamen voor een Unity Catalog-volume. De opgegeven code probeert het opgegeven schema en volume te maken als deze objecten niet bestaan. U moet over de juiste bevoegdheden beschikken om objecten te maken en te schrijven in Unity Catalog. Zie Vereisten.

Notitie

Zorg ervoor dat dit notitieboek met succes is uitgevoerd voordat u verdergaat met de zelfstudie. Configureer dit notebook niet als onderdeel van uw pijplijn.

stap 1: een pijplijn maken

DLT maakt pijplijnen door afhankelijkheden op te lossen die zijn gedefinieerd in notebooks of bestanden (broncodegenoemd) met behulp van DLT-syntaxis. Elk broncodebestand kan slechts één taal bevatten, maar u kunt meerdere taalspecifieke notebooks of bestanden toevoegen in de pijplijn.

Belangrijk

Configureer geen assets in het veld Broncode. Als u dit veld zwart laat, wordt er een notebook gemaakt en geconfigureerd voor broncodecreatie.

In de instructies in deze zelfstudie worden serverloze berekeningen en Unity Catalog gebruikt. Gebruik de standaardinstellingen voor alle configuratieopties die niet in deze instructies zijn opgegeven.

Notitie

Als serverloos niet is ingeschakeld of ondersteund in uw werkruimte, kunt u de zelfstudie voltooien zoals geschreven met behulp van de standaard-rekeninstellingen. U moet handmatig Unity Catalog selecteren onder Opslagopties in de sectie Doel van de Pijplijn-gebruikersinterface maken.

Ga als volgt te werk om een nieuwe pijplijn te configureren:

  1. Klik in de zijbalk op DLT.
  2. Klik op Pijplijn maken.
  3. Typ in pijplijnnaameen unieke pijplijnnaam.
  4. Selecteer het selectievakje Serverless.
  5. Als u in Doel-een Unity Catalog-locatie wilt configureren waar tabellen worden gepubliceerd, selecteert u een Catalog- en een Schema-.
  6. Klik in Advancedop Add configuration en definieer vervolgens pijplijnparameters voor de catalogus, het schema en het volume waarnaar u gegevens hebt gedownload met behulp van de volgende parameternamen:
    • my_catalog
    • my_schema
    • my_volume
  7. Klik op maken.

De gebruikersinterface van pijplijnen wordt weergegeven voor de nieuwe pijplijn. Er wordt automatisch een broncode-notebook gemaakt en geconfigureerd voor de pijplijn.

Het notitieblok wordt gemaakt in een nieuwe map in uw gebruikersmap. De naam van de nieuwe map en het nieuwe bestand komen overeen met de naam van uw pijplijn. Bijvoorbeeld /Users/your.username@databricks.com/my_pipeline/my_pipeline.

Een koppeling voor toegang tot dit notitieblok bevindt zich onder het veld broncode in het deelvenster Pijplijndetails. Klik op de koppeling om het notitieblok te openen voordat u doorgaat met de volgende stap.

Stap 2: Gerealiseerde weergaven en streamingtabellen declareren in een notebook met Python of SQL

U kunt Datbricks-notebooks gebruiken om interactief broncode te ontwikkelen en te valideren voor DLT-pijplijnen. U moet uw notebook koppelen aan de pijplijn om deze functionaliteit te kunnen gebruiken. Als u uw zojuist gemaakte notebook wilt koppelen aan de pijplijn die u zojuist hebt gemaakt:

  1. Klik op Verbinding maken in de rechterbovenhoek om het configuratiemenu voor berekeningen te openen.
  2. Beweeg de muisaanwijzer over de naam van de pijplijn die u in stap 1 hebt gemaakt.
  3. Klik op Maak verbinding.

De gebruikersinterface verandert om Valideren en Start knoppen in de rechterbovenhoek op te nemen. Zie DLT-pijplijnen ontwikkelen en fouten opsporen in notebooksvoor meer informatie over notebookondersteuning voor ontwikkeling van pijplijncode.

Belangrijk

  • Tijdens het plannen evalueren DLT-pijplijnen alle cellen in een notebook. In tegenstelling tot notebooks die worden uitgevoerd in een algemene rekenomgeving of als taken zijn gepland, garanderen pijplijnen niet dat de cellen in de opgegeven volgorde worden uitgevoerd.
  • Notebooks kunnen slechts één programmeertaal bevatten. Meng geen Python- en SQL-code in notebooks met broncode voor pijplijnen.

Zie Pijplijncode ontwikkelen met Python of Pijplijncode ontwikkelen met SQLvoor meer informatie over het ontwikkelen van code met Python of SQL.

Voorbeeld van pijplijncode

Als u het voorbeeld in deze zelfstudie wilt implementeren, kopieert en plakt u de volgende code in een cel in het notebook dat is geconfigureerd als broncode voor uw pijplijn.

De opgegeven code doet het volgende:

  • Hiermee importeert u de benodigde modules (alleen Python).
  • Verwijzingen naar parameters die zijn gedefinieerd tijdens de pijplijnconfiguratie.
  • Definieert een streamingtabel genaamd baby_names_raw die gegevens uit een volume ontvangt.
  • Definieert een gerealiseerde weergave met de naam baby_names_prepared waarmee opgenomen gegevens worden gevalideerd.
  • Definieert een gerealiseerde weergave met de naam top_baby_names_2021 met een zeer verfijnde weergave van de gegevens.

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

stap 3: Een pijplijnupdate starten

Als u een pijplijnupdate wilt starten, klikt u op de knop Start in de rechterbovenhoek van de gebruikersinterface van het notitieblok.

Voorbeeldnotitieblokken

De volgende notebooks bevatten dezelfde codevoorbeelden als in dit artikel. Deze notebooks hebben dezelfde vereisten als de stappen in dit artikel. Zie Vereisten.

Voer de volgende stappen uit om een notebook te importeren:

  1. Open de gebruikersomgeving van de notebook.
    • Klik op + Nieuw>Notitieblok.
    • Er wordt een leeg notitieblok geopend.
  2. Klik op Bestand>importeren.... Het dialoogvenster Importeren wordt weergegeven.
  3. Selecteer de optie URL voor Importeren uit.
  4. Plak de URL van het notitieblok.
  5. Klik op importeren.

Voor deze handleiding moet u een notebook voor het instellen van gegevens uitvoeren voordat u uw DLT-pijplijn configureert en uitvoert. Importeer het volgende notebook, koppel het notebook aan een rekenresource, vul de vereiste variabele in voor my_catalog, my_schemaen my_volumeen klik op Alleuitvoeren.

Gegevens downloaden voor zelfstudie van pijplijnen

Een notitieblok ophalen

De volgende notebooks bevatten voorbeelden in Python of SQL. Wanneer u een notitieblok importeert, wordt dit opgeslagen in de basismap van uw gebruiker.

Nadat u een van de onderstaande notebooks hebt geïmporteerd, voert u de stappen uit om een pijplijn te maken, maar gebruikt u de broncode bestandskiezer om het gedownloade notitieblok te selecteren. Nadat u de pijplijn hebt gemaakt met een notebook die is geconfigureerd als broncode, klikt u op Start in de gebruikersinterface van de pijplijn om een update te initiëren.

Aan de slag met DLT Python-notebook

Notitieboek ophalen

Aan de slag met DLT SQL-notebook

Notitieblok ophalen

Aanvullende informatiebronnen