Kurz: Spuštění prvního kanálu DLT
Tento kurz vás provede postupem konfigurace prvního kanálu DLT, zápisu základního kódu ETL a spuštění aktualizace kanálu.
Všechny kroky v tomto návodu jsou určené pro pracovní prostory s aktivovaným katalogem Unity. Kanály DLT můžete také nakonfigurovat tak, aby fungovaly se starší verzí metastoru Hive. Viz Použití kanálů DLT se starší verzí metastoru Hive.
Poznámka
Tento tutoriál obsahuje pokyny pro vývoj a ověřování nového kódu pipeline pomocí Databricks notebooků. Kanály můžete také nakonfigurovat pomocí zdrojového kódu v Pythonu nebo v souborech SQL.
Kanál můžete nakonfigurovat ke spuštění kódu, pokud už máte zdrojový kód napsaný pomocí syntaxe DLT. Viz Konfigurujte kanál DLT.
Plně deklarativní syntaxi SQL v Databricks SQL můžete použít k registraci a nastavení plánů aktualizace pro materializovaná zobrazení a streamované tabulky jako objekty spravované katalogem Unity. Viz Použití materializovaných zobrazení v Databricks SQL a Načtení dat pomocí streamovaných tabulek v Databricks SQL.
Příklad: Ingestování a zpracování dat o názvech dětí v New Yorku
Příklad v tomto článku používá veřejně dostupnou datovou sadu, která obsahuje záznamy jména dětí ve státě New York. Tento příklad ukazuje použití kanálu DLT k:
- Načíst nezpracovaná data CSV ze svazku do tabulky.
- Načtěte záznamy z tabulky příjmu dat a použijte moduly DLT a očekávání k vytvoření nové tabulky, která obsahuje vyčištěná data.
- Vyčištěné záznamy použijte jako vstup do dotazů DLT, které vytvářejí odvozené datové sady.
Tento kód ukazuje zjednodušený příklad architektury medallionu. Podívejte se na část Co je architektura medailonového lakehouse?.
Implementace tohoto příkladu jsou k dispozici pro Python a SQL. Podle pokynů vytvořte nový kanál a poznámkový blok a potom zkopírujte zadaný kód.
K dispozici jsou také ukázky notebooků s úplným kódem.
Požadavky
- Pokud chcete spustit potrubí, musíte mít oprávnění k vytvoření klastru nebo, nebo přístup k zásadám klastru, které definují klastr DLT. Modul runtime DLT vytvoří cluster před spuštěním pipeline a selže, pokud nemáte správné oprávnění.
- Všichni uživatelé můžou ve výchozím nastavení aktivovat aktualizace pomocí bezserverových kanálů. Bezserverová služba musí být povolená na úrovni účtu a nemusí být dostupná ve vaší oblasti pracovního prostoru. Viz Povolit bezserverové výpočty.
Příklady v tomto návodu používají katalog Unity. Databricks doporučuje vytvořit nové schéma pro spuštění tohoto kurzu, protože v cílovém schématu se vytváří více databázových objektů.
- Pokud chcete vytvořit nové schéma v katalogu, musíte mít oprávnění
ALL PRIVILEGES
neboUSE CATALOG
aCREATE SCHEMA
. - Pokud nemůžete vytvořit nové schéma, spusťte tento kurz s existujícím schématem. Musíte mít následující oprávnění:
-
USE CATALOG
pro nadřazený katalog. -
ALL PRIVILEGES
aneboUSE SCHEMA
,CREATE MATERIALIZED VIEW
aCREATE TABLE
oprávnění k cílovému schématu.
-
- Tento návod používá svazek k uložení ukázkových dat. Databricks doporučuje vytvořit nový svazek pro účely tohoto kurzu. Pokud pro tento kurz vytvoříte nové schéma, můžete v tomto schématu vytvořit nový svazek.
- Pokud chcete vytvořit nový svazek v existujícím schématu, musíte mít následující oprávnění:
-
USE CATALOG
pro nadřazený katalog. -
ALL PRIVILEGES
neboUSE SCHEMA
aCREATE VOLUME
oprávnění k cílovému schématu.
-
- Volitelně můžete použít existující svazek. Musíte mít následující oprávnění:
-
USE CATALOG
pro nadřazený katalog. -
USE SCHEMA
pro nadřazené schéma. -
ALL PRIVILEGES
neboREAD VOLUME
aWRITE VOLUME
na cílovém svazku.
-
- Pokud chcete vytvořit nový svazek v existujícím schématu, musíte mít následující oprávnění:
Pokud chcete tato oprávnění nastavit, obraťte se na správce Databricks. Další informace o oprávněních katalogu Unity najdete v tématu Oprávnění katalogu Unity a zabezpečitelné objekty.
- Pokud chcete vytvořit nové schéma v katalogu, musíte mít oprávnění
Krok 0: Stažení dat
Tento příklad načte data ze svazku katalogu Unity. Následující kód stáhne soubor CSV a uloží ho do zadaného svazku. Otevřete nový poznámkový blok a spuštěním následujícího kódu stáhněte tato data do zadaného svazku:
import urllib
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
urllib.request.urlretrieve(download_url, volume_path + filename)
Nahraďte <catalog-name>
, <schema-name>
a <volume-name>
katalogem, schématem a názvy svazků pro svazek katalogu Unity. Zadaný kód se pokusí vytvořit zadané schéma a svazek, pokud tyto objekty neexistují. Musíte mít příslušná oprávnění k vytvoření a zápisu do objektů v katalogu Unity. Viz požadavky .
Poznámka
Než budete pokračovat v kurzu, ujistěte se, že se tento poznámkový blok úspěšně spustil. Tento poznámkový blok nekonfigurujte jako součást procesu.
Krok 1: Vytvořte potrubí
DLT vytváří datové toky řešením závislostí definovaných v poznámkových blocích nebo souborech (označovaných jako zdrojový kód) pomocí syntaxe DLT. Každý soubor zdrojového kódu může obsahovat pouze jeden jazyk, ale do pipeline můžete přidat více souborů nebo poznámkových bloků určených pro konkrétní programovací jazyk.
Důležitý
Nekonfigurujte žádné prostředky v poli Zdrojový kód. Když toto pole necháte černé, vytvoří a nakonfiguruje poznámkový blok pro vytváření zdrojového kódu.
Pokyny v tomto kurzu používají bezserverové výpočetní prostředky a katalog Unity. Pro všechny možnosti konfigurace, které nejsou uvedené v těchto pokynech, použijte výchozí nastavení.
Poznámka
Pokud v pracovním prostoru není bezserverová služba povolená nebo podporovaná, můžete kurz dokončit tak, jak je napsané pomocí výchozích nastavení výpočetních prostředků. Musíte ručně vybrat katalog Unity v části Možnosti úložiště v sekci Cíl uživatelského rozhraní Vytvořit kanál.
Pokud chcete nakonfigurovat nový kanál, postupujte takto:
- Na bočním panelu klikněte na DLT.
- Klikněte na Vytvořit kanál.
- Do pole Název kanáluzadejte jedinečný název kanálu.
- Vyberte políčko Serverless.
- V umístěnínakonfigurujte místo pro katalog Unity, kde jsou publikovány tabulky, vyberte katalog a schéma .
- V Advancedklikněte na Přidat konfiguraci a potom definujte parametry procesu pro katalog, schéma a svazek, do kterého jste stáhli data pomocí následujících názvů parametrů.
my_catalog
my_schema
my_volume
- Klikněte na Vytvořit.
Pro nový kanál se zobrazí uživatelské rozhraní kanálů. Poznámkový blok zdrojového kódu se automaticky vytvoří a nakonfiguruje pro pipeline.
Notebook bude vytvořen v novém adresáři ve vašem uživatelském adresáři. Název nového adresáře a souboru odpovídá názvu kanálu. Například /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
Odkaz pro přístup k tomuto poznámkovému bloku je pod polem zdrojový kód na panelu podrobností kanálu. Kliknutím na odkaz otevřete poznámkový blok a teprve potom přejděte k dalšímu kroku.
Krok 2: Deklarace materializovaných zobrazení a streamovaných tabulek v poznámkovém bloku pomocí Pythonu nebo SQL
Poznámkové bloky Datbricks můžete použít k interaktivnímu vývoji a ověření zdrojového kódu pro kanály DLT. Abyste mohli tuto funkci používat, musíte poznámkový blok připojit k pipeline. Připojení nově vytvořeného poznámkového bloku ke kanálu, který jste právě vytvořili:
- Kliknutím na Připojit v pravém horním rohu otevřete nabídku konfigurace výpočetních prostředků.
- Najeďte myší na název kanálu, který jste vytvořili v kroku 1.
- Klikněte na Připojit.
Uživatelské rozhraní se mění tak, aby zahrnovalo tlačítka Ověřit a Start v pravém horním rohu. Další informace o podpoře poznámkového bloku pro vývoj kódu kanálu najdete v tématu Vývoj a ladění kanálů DLT v poznámkových blocích.
Důležitý
- Kanály DLT během plánování vyhodnocují všechny buňky v poznámkovém bloku. Na rozdíl od poznámkových bloků, které se spouštějí na všeobecných výpočetních prostředcích nebo jsou naplánovány jako úlohy, datové potrubí nezaručují, že se buňky spouštějí v určeném pořadí.
- Poznámkové bloky mohou obsahovat pouze jeden programovací jazyk. Nekombinujte Python a SQL v poznámkových blocích zdrojového kódu vývojového řetězce.
Podrobnosti o vývoji kódu pomocí Pythonu nebo SQL najdete v tématu Vývoj kódu kanálu pomocí Pythonu nebo Vývoj kódu kanálu pomocí SQL.
Příklad kódu kanálu
Pokud chcete implementovat příklad v tomto kurzu, zkopírujte a vložte následující kód do buňky v poznámkovém bloku nakonfigurovaného jako zdrojový kód pro váš kanál.
Zadaný kód provede následující:
- Importuje nezbytné moduly (pouze Python).
- Odkazuje na parametry definované během konfigurace kanálu.
- Definuje streamovací tabulku s názvem
baby_names_raw
, která přijímá data z datového svazku. - Definuje materializované zobrazení s názvem
baby_names_prepared
, které ověřuje ingestovaná data. - Definuje materializované zobrazení s názvem
top_baby_names_2021
, které má vysoce upřesněné zobrazení dat.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
Krok 3: Spuštění aktualizace pipeline
Pro zahájení aktualizace pipeline klikněte na tlačítko Start v pravém horním rohu uživatelského rozhraní poznámkového bloku.
Příkladové poznámkové bloky
Následující poznámkové bloky obsahují stejné příklady kódu, které jsou uvedeny v tomto článku. Tyto poznámkové bloky mají stejné požadavky jako kroky v tomto článku. Viz požadavky .
Pokud chcete importovat poznámkový blok, proveďte následující kroky:
- Otevřete uživatelské rozhraní poznámkového bloku.
- Klikněte na + Nový>poznámkový blok.
- Otevře se prázdný poznámkový blok.
- Klikněte na Soubor>Import.... Zobrazí se dialogové okno Import.
- Vyberte možnost pro adresu URL pro Import z.
- Vložte adresu URL poznámkového bloku.
- Klikněte na Importovat.
Tento kurz vyžaduje, abyste před konfigurací a spuštěním kanálu DLT spustili poznámkový blok nastavení dat. Naimportujte následující poznámkový blok, připojte poznámkový blok k výpočetnímu prostředku, vyplňte požadovanou proměnnou pro my_catalog
, my_schema
a my_volume
a klikněte na Spustit všechny.
Stahování dat pro výukový tutoriál o datových potrubích
Získání poznámkového bloku
Následující poznámkové bloky obsahují příklady v Pythonu nebo SQL. Při importu poznámkového bloku se uloží do domovského adresáře uživatele.
Po importu některého z následujících poznámkových bloků dokončete kroky pro vytvoření pipeline, ale pomocí nástroje pro výběr souboru zdrojového kódu vyberte stažený poznámkový blok. Po vytvoření kanálu s poznámkovým blokem nakonfigurovaným jako zdrojový kód klikněte na Spustit v uživatelském rozhraní kanálu a aktivujte aktualizaci.
Začínáme s poznámkovým blokem Pythonu v DLT
Získání poznámkového bloku
Začínáme s poznámkovým blokem DLT SQL
Získej poznámkový blok