Návod: Spuštění vašeho prvního Delta Live Tables pipeline
Tento kurz vás provede kroky ke konfiguraci prvního kanálu Delta Live Tables, zápisu základního kódu ETL a spuštění kanálu update.
Všechny kroky v tomto kurzu jsou určené pro pracovní prostory s povolenou Unity Catalog. Kanály Delta Live Tables můžete také nakonfigurovat tak, aby fungovaly se starší verzí metastoru Hive. Viz Použití kanálů Delta Live Tables s původním Hive metastore.
Poznámka:
Tento kurz obsahuje pokyny pro vývoj a ověřování nového kódu kanálu pomocí poznámkových bloků Databricks. Kanály můžete také nakonfigurovat pomocí zdrojového kódu v Pythonu nebo v souborech SQL.
Kanál můžete nakonfigurovat ke spuštění kódu, pokud už máte zdrojový kód napsaný pomocí syntaxe Delta Live Tables. Viz Nakonfiguruj kanál Tables Delta Live.
Plně deklarativní syntaxi SQL v Databricks SQL můžete použít k registraci a plánování setrefresh pro materializované views a streamované tables jako objekty spravované Unity Catalog. Viz Použití materializovaných views v Databricks SQL a Načtení dat pomocí streamovaných tables vDatabricks SQL .
Příklad: Ingestování a zpracování dat o názvech dětí v New Yorku
Příklad v tomto článku používá veřejně dostupnou datovou sadu, která obsahuje záznamy jmen dětí v New Yorku. Tento příklad ukazuje použití pipeline Delta Live Tables k:
- Přečtěte surová data CSV ze svazku do table.
- Přečtěte si záznamy z table příjmu dat a pomocí aplikace Delta Live Tablesočekávání vytvořte nový table, který obsahuje vyčištěná data.
- Vyčištěné záznamy použijte jako vstup do dotazů Delta Live Tables, které vytvářejí odvozené datové sady.
Tento kód ukazuje zjednodušený příklad architektury medallionu. Podívejte se, co je architektura jezera medallion?
Implementace tohoto příkladu jsou k dispozici pro Python a SQL. Podle pokynů vytvořte nový kanál a poznámkový blok a potom zkopírujte zadaný kód.
K dispozici jsou také ukázkové poznámkové bloky s úplným kódem.
Požadavky
Pokud chcete spustit kanál, musíte mít oprávnění k vytvoření clusteru nebo mít přístup k politikám, které definují cluster Delta Live Tables. Modul runtime Delta Live Tables vytvoří cluster před tím, než spustí váš datový tok, a selže, pokud nemáte potřebná oprávnění.
Všichni uživatelé můžou ve výchozím nastavení aktivovat aktualizace pomocí bezserverových kanálů. Bezserverová služba musí být povolená na úrovni účtu a nemusí být dostupná ve vaší oblasti pracovního prostoru. Viz Povolení výpočetních prostředků bez serveru.
Příklady v tomto kurzu používají Unity Catalog. Databricks doporučuje vytvořit nový schema ke spuštění tohoto kurzu, protože v cílovém schemase vytvoří více databázových objektů .
- Pokud chcete v schemavytvořit nový catalog, musíte mít oprávnění
ALL PRIVILEGES
neboUSE CATALOG
aCREATE SCHEMA
. - Pokud nemůžete vytvořit nový schema, spusťte tento tutoriál pro existující schema. Musíte mít následující oprávnění:
-
USE CATALOG
pro nadřazenou catalog. -
ALL PRIVILEGES
neboUSE SCHEMA
,CREATE MATERIALIZED VIEW
aCREATE TABLE
oprávnění na cílovém schema.
-
- Tento kurz používá svazek k ukládání ukázkových dat. Databricks doporučuje vytvořit nový svazek pro účely tohoto kurzu. Pokud pro tento kurz vytvoříte nový schema, můžete v tomto schemavytvořit nový svazek .
- Pokud chcete vytvořit nový svazek v existujícím schema, musíte mít následující oprávnění:
-
USE CATALOG
pro nadřazenou catalog. -
ALL PRIVILEGES
neboUSE SCHEMA
aCREATE VOLUME
oprávnění k cílovému schema.
-
- Volitelně můžete použít existující svazek. Musíte mít následující oprávnění:
-
USE CATALOG
pro nadřazenou catalog. -
USE SCHEMA
pro nadřazenou schema. -
ALL PRIVILEGES
neboREAD VOLUME
naWRITE VOLUME
cílovém svazku.
-
- Pokud chcete vytvořit nový svazek v existujícím schema, musíte mít následující oprávnění:
Chcete-li set tato oprávnění, obraťte se na správce Databricks. Další informace o oprávněních Unity Catalog najdete v tématu Unity Catalog oprávnění a zabezpečitelné objekty.
- Pokud chcete v schemavytvořit nový catalog, musíte mít oprávnění
Krok 0: Stažení dat
Tento příklad načte data ze svazku Unity Catalog. Následující kód stáhne soubor CSV a uloží ho do zadaného svazku. Otevřete nový poznámkový blok a spuštěním následujícího kódu stáhněte tato data do zadaného svazku:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
Nahraďte <catalog-name>
, <schema-name>
a <volume-name>
pomocí názvů catalog, schemaa Catalog pro svazek Unity. Zadaný kód se pokusí vytvořit specifikovaný objekt schema a svazek, pokud tyto objekty neexistují. Musíte mít příslušná oprávnění k vytvoření a zápisu do objektů v Unity Catalog. Viz Požadavky.
Poznámka:
Než budete pokračovat v kurzu, ujistěte se, že se tento poznámkový blok úspěšně spustil. Tento poznámkový blok nekonfigurujte jako součást kanálu.
Krok 1: Vytvoření kanálu
Delta Live Tables vytváří kanály řešením závislostí definovaných v poznámkových blocích nebo souborech (označovaných jako zdrojový kód) pomocí syntaxe Delta Live Tables. Každý soubor zdrojového kódu může obsahovat pouze jeden jazyk, ale do kanálu můžete přidat více poznámkových bloků nebo souborů specifických pro jazyk.
Důležité
Nekonfigurujte žádné prostředky v poli Zdrojový kód . Když toto pole necháte černé, vytvoří a nakonfiguruje poznámkový blok pro vytváření zdrojového kódu.
Pokyny v tomto kurzu používají bezserverové výpočetní prostředky a unity Catalog. Pro všechny možnosti konfigurace, které nejsou uvedené v těchto pokynech, použijte výchozí nastavení.
Poznámka:
Pokud v pracovním prostoru není bezserverová služba povolená nebo podporovaná, můžete kurz dokončit tak, jak je napsané pomocí výchozích nastavení výpočetních prostředků.
Pokud chcete nakonfigurovat nový kanál, postupujte takto:
- Na bočním panelu klikněte na Delta Live Tables.
- Klikněte na Vytvořit kanál.
- Do pole Název kanáluzadejte jedinečný název kanálu.
- Select zaškrtávací políčko bezserverové.
- V cílovémse konfiguruje umístění Catalog Unity, publikují se wheretables, selectCatalog a Schema.
- V Advancedklikněte na Přidat konfiguraci a potom definujte pipeline parameters pro catalog, schemaa svazek, do něhož jste stáhli data pomocí následujících názvů parametrů:
my_catalog
my_schema
my_volume
- Klikněte na Vytvořit.
Pro nový kanál se zobrazí uživatelské rozhraní kanálů. Poznámkový blok zdrojového kódu se automaticky vytvoří a nakonfiguruje pro kanál.
Poznámkový blok se vytvoří v novém adresáři ve vašem uživatelském adresáři. Název nového adresáře a souboru odpovídá názvu kanálu. Například /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
Odkaz pro přístup k tomuto poznámkovému bloku je pod polem Zdrojový kód na panelu podrobností kanálu. Kliknutím na odkaz otevřete poznámkový blok a teprve potom přejděte k dalšímu kroku.
Krok 2: Deklarujte materializované views a streamované tables v poznámkovém bloku pomocí Pythonu nebo SQL
Poznámkové bloky Databricks můžete použít k interaktivnímu vývoji a ověření zdrojového kódu pro kanály Delta Live Tables. Abyste mohli tuto funkci používat, musíte poznámkový blok připojit ke kanálu. Připojení nově vytvořeného poznámkového bloku ke kanálu, který jste právě vytvořili:
- Kliknutím na Připojit v pravém horním rohu otevřete nabídku konfigurace výpočetních prostředků.
- Najeďte myší na název kanálu, který jste vytvořili v kroku 1.
- Klepněte na tlačítko Připojit.
Změny uživatelského rozhraní tak, aby obsahovaly tlačítka Ověřit a Spustit v pravém horním rohu. Další informace o podpoře vývoje kódu pipeline najdete v tématu Vývoj a ladění kanálů Delta Live Tables v poznámkových blocích.
Důležité
- Kanály Delta Live Tables vyhodnocují všechny buňky v poznámkovém bloku během plánování. Na rozdíl od poznámkových bloků, které se spouštějí pro výpočetní prostředky pro všechny účely nebo jsou naplánované jako úlohy, kanály nezaručují, že se buňky spouštějí v zadaném pořadí.
- Poznámkové bloky můžou obsahovat pouze jeden programovací jazyk. Nekombinujte kód Pythonu a SQL v poznámkových blocích zdrojového kódu kanálu.
Podrobnosti o vývoji kódu pomocí Pythonu nebo SQL najdete v tématu Vývoj kódu kanálu pomocí Pythonu nebo vývoje kódu kanálu pomocí SQL.
Příklad kódu kanálu
Pokud chcete implementovat příklad v tomto kurzu, zkopírujte a vložte následující kód do buňky v poznámkovém bloku nakonfigurovaného jako zdrojový kód pro váš kanál.
Zadaný kód provede následující:
- Importuje nezbytné moduly (pouze Python).
- Odkazy parameters definované během konfigurace kanálu.
- Definice streamovacího table pojmenovaného
baby_names_raw
, který získává data ze svazku. - Definuje materializované zobrazení s názvem
baby_names_prepared
, které ověřuje ingestovaná data. - Definuje materializované zobrazení s názvem
top_baby_names_2021
, které má vysoce upřesňující zobrazení dat.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
Krok 3: Spustit potrubí update
Chcete-li spustit potrubí update, klikněte na tlačítko Spustit v pravém horním rohu uživatelského rozhraní notebooku.
Ukázkové poznámkové bloky
Následující poznámkové bloky obsahují stejné příklady kódu, které jsou uvedeny v tomto článku. Tyto poznámkové bloky mají stejné požadavky jako kroky v tomto článku. Viz Požadavky.
Pokud chcete importovat poznámkový blok, proveďte následující kroky:
- Otevřete uživatelské rozhraní poznámkového bloku.
- Klikněte na + Nový>poznámkový blok.
- Otevře se prázdný poznámkový blok.
- Klikněte na File>Import (Soubor -> Importovat). Zobrazí se dialogové okno Importovat .
možnost adresy URL pro Import z .- Vložte adresu URL poznámkového bloku.
- Klepněte na tlačítko Import.
Tento kurz vyžaduje, abyste před konfigurací a spuštěním pipeline Delta Live Tables spustili sešit nastavení dat. Naimportujte následující poznámkový blok, připojte poznámkový blok k výpočetnímu prostředku, vyplňte požadovanou proměnnou a my_catalog
my_schema
my_volume
klikněte na Spustit vše.
Kurz stahování dat pro kanály
Následující poznámkové bloky obsahují příklady v Pythonu nebo SQL. Při importu poznámkového bloku se uloží do domovského adresáře uživatele.
Po importu jednoho z následujících poznámkových bloků dokončete kroky pro vytvoření kanálu, ale pomocí nástroje