Samouczek: Jak uruchomić pierwszy potok Delta Live Tables
Ten samouczek przeprowadzi Cię przez kroki konfigurowania pierwszego potoku Delta Live Tables, pisania podstawowego kodu ETL oraz uruchamiania aktualizacji potoku.
Wszystkie kroki opisane w tym samouczku są przeznaczone dla obszarów roboczych z włączonym Unity Catalog. Możesz również skonfigurować potoki Delta Live Tables do współpracy z legacy Hive metastore. Zobacz Użyj potoków Delta Live Tables z klasycznym metastore Hive.
Uwaga
Ten samouczek zawiera instrukcje dotyczące tworzenia i weryfikowania nowego kodu potoku przy użyciu notesów usługi Databricks. Potoki można również skonfigurować przy użyciu kodu źródłowego w plikach Python lub SQL.
Możesz skonfigurować potok tak, aby uruchamiał Twój kod, jeśli już posiadasz kod źródłowy napisany przy użyciu składni Delta Live Tables. Zobacz Konfigurowanie potoku Delta Live Tables.
Można używać w pełni deklaratywnej składni SQL w usłudze Databricks SQL do rejestrowania i ustawiania harmonogramów odświeżania dla zmaterializowanych widoków oraz tabel strumieniowych jako obiektów zarządzanych w ramach Unity Catalog. Zobacz Użyj widoków materializowanych w Databricks SQL oraz Załaduj dane przy użyciu strumieniowych tabel w Databricks SQL.
Przykład: pozyskiwanie i przetwarzanie danych dotyczących nazw dzieci w Nowym Jorku
W przykładzie w tym artykule użyto publicznie dostępnego zestawu danych zawierającego rekordy nowojorskich nazw dzieci. W tym przykładzie pokazano, jak korzystać z potoku Delta Live Tables, aby:
- Odczytywanie nieprzetworzonych danych CSV z woluminu do tabeli.
- Odczytaj rekordy z tabeli pozyskiwania i użyj tabel delta Live Tables oczekiwania, aby utworzyć nową tabelę zawierającą oczyszczone dane.
- Użyj oczyszczonych rekordów jako danych wejściowych do zapytań delta Live Tables, które tworzą pochodne zestawy danych.
Ten kod przedstawia uproszczony przykład architektury medalonu. Zobacz Co to jest architektura medallion lakehouse?.
Implementacje tego przykładu są udostępniane dla języków Python i SQL. Wykonaj kroki, aby utworzyć nowy potok i notes, a następnie skopiuj podany kod.
Podano również przykładowe notesy z kompletnym kodem.
Wymagania
Aby uruchomić pipeline, musisz mieć uprawnienia do tworzenia klastra lub dostęp do polityki klastra definiującej klaster Delta Live Tables. Środowisko uruchomieniowe delta Live Tables tworzy klaster przed uruchomieniem potoku i kończy się niepowodzeniem, jeśli nie masz odpowiednich uprawnień.
Wszyscy użytkownicy mogą domyślnie wyzwalać aktualizacje przy użyciu potoków bezserwerowych. Bezserwerowa musi być włączona na poziomie konta i może nie być dostępna w regionie obszaru roboczego. Zobacz Włączanie przetwarzania bezserwerowego.
W przykładach w tym samouczku użyto Unity Catalog. Usługa Databricks zaleca utworzenie nowego schematu w celu uruchomienia tego samouczka, ponieważ wiele obiektów bazy danych jest tworzonych w schemacie docelowym.
- Aby utworzyć nowy schemat w wykazie, musisz mieć uprawnienia
ALL PRIVILEGES
lubUSE CATALOG
iCREATE SCHEMA
. - Jeśli nie możesz utworzyć nowego schematu, uruchom ten samouczek względem istniejącego schematu. Musisz mieć następujące uprawnienia:
-
USE CATALOG
katalogu nadrzędnego. -
ALL PRIVILEGES
lubUSE SCHEMA
,CREATE MATERIALIZED VIEW
oraz uprawnieniaCREATE TABLE
w schemacie docelowym.
-
- W tym samouczku do przechowywania przykładowych danych jest używany wolumin. Usługa Databricks zaleca utworzenie nowego woluminu na potrzeby tego samouczka. Jeśli utworzysz nowy schemat dla tego samouczka, możesz utworzyć nowy wolumin w tym schemacie.
- Aby utworzyć nowy wolumin w istniejącym schemacie, musisz mieć następujące uprawnienia:
-
USE CATALOG
katalogu nadrzędnego. -
ALL PRIVILEGES
lubUSE SCHEMA
i uprawnieniaCREATE VOLUME
w schemacie docelowym.
-
- Opcjonalnie możesz użyć istniejącego woluminu. Musisz mieć następujące uprawnienia:
-
USE CATALOG
katalogu nadrzędnego. -
USE SCHEMA
schematu nadrzędnego. -
ALL PRIVILEGES
lubREAD VOLUME
iWRITE VOLUME
na woluminie docelowym.
-
- Aby utworzyć nowy wolumin w istniejącym schemacie, musisz mieć następujące uprawnienia:
Aby ustawić te uprawnienia, skontaktuj się z administratorem usługi Databricks. Aby uzyskać więcej informacji na temat uprawnień Unity Catalog, zobacz uprawnienia Unity Catalog i zabezpieczane obiekty.
- Aby utworzyć nowy schemat w wykazie, musisz mieć uprawnienia
Krok 0. Pobieranie danych
W tym przykładzie ładowane są dane z wolumenu Unity Catalog. Poniższy kod pobiera plik CSV i przechowuje go w określonym woluminie. Otwórz nowy notes i uruchom następujący kod, aby pobrać te dane do określonego woluminu:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
Zastąp <catalog-name>
, <schema-name>
i <volume-name>
nazwami katalogu, schematu i woluminu dla woluminu Unity Catalog. Podany kod próbuje utworzyć określony schemat i wolumin, jeśli te obiekty nie istnieją. Musisz mieć odpowiednie uprawnienia do tworzenia i zapisu obiektów w Unity Catalog. Zobacz Wymagania.
Uwaga
Przed kontynuowaniem pracy z samouczkiem upewnij się, że ten notes został pomyślnie uruchomiony. Nie należy konfigurować tego notesu w ramach potoku.
Krok 1. Tworzenie potoku
Delta Live Tables tworzy potoki przez rozwiązywanie zależności zdefiniowanych w notesach lub plikach (nazywanych kodem źródłowym) przy użyciu składni Delta Live Tables. Każdy plik kodu źródłowego może zawierać tylko jeden język, ale w potoku można dodać wiele notesów lub plików specyficznych dla języka.
Ważne
Nie należy konfigurować żadnych zasobów w polu Kod źródłowy. Pozostawienie tego pola czarnego powoduje utworzenie i skonfigurowanie notesu na potrzeby tworzenia kodu źródłowego.
Instrukcje w tym samouczku korzystają z bezserwerowego przetwarzania i Katalogu Unity. Użyj ustawień domyślnych dla wszystkich opcji konfiguracji, które nie zostały określone w tych instrukcjach.
Uwaga
Jeśli bezserwerowa nie jest włączona lub obsługiwana w obszarze roboczym, możesz ukończyć samouczek zgodnie z instrukcjami napisanymi przy użyciu domyślnych ustawień obliczeniowych. W interfejsie użytkownika Create pipeline, w sekcji Destination, należy ręcznie wybrać Unity Catalog w obszarze Storage options.
Aby skonfigurować nowy potok, wykonaj następujące czynności:
- Na pasku bocznym kliknij opcję Delta Live Tables.
- Kliknij Utwórz przepływ.
- W nazwa potokuwpisz unikatową nazwę potoku.
- Zaznacz pole wyboru Serverless.
- W Cel, aby skonfigurować lokalizację wykazu aparatu Unity, w której są publikowane tabele, wybierz Katalog i Schemat.
- W Zaawansowanekliknij opcję Dodaj konfigurację, a następnie zdefiniuj parametry potoku dla katalogu, schematu i woluminu, do którego pobrano dane przy użyciu następujących nazw parametrów:
my_catalog
my_schema
my_volume
- Kliknij pozycję Utwórz.
Interfejs użytkownika dla potoków pojawia się dla nowego potoku. Notes kodu źródłowego jest tworzony automatycznie i konfigurowany dla potoku.
Notes jest tworzony w nowym katalogu w katalogu użytkownika. Nazwa nowego katalogu i pliku jest zgodna z nazwą potoku. Na przykład /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
Link umożliwiający dostęp do tego notesu znajduje się w polu Kod źródłowy w panelu Szczegóły potoku. Kliknij link, aby otworzyć notes przed przejściem do następnego kroku.
Krok 2. Deklarowanie zmaterializowanych widoków i tabel przesyłania strumieniowego w notesie przy użyciu języka Python lub SQL
Notesy usługi Databricks umożliwiają interaktywne opracowywanie i weryfikowanie kodu źródłowego dla potoków Delta Live Tables. Aby korzystać z tej funkcji, musisz dołączyć notes do potoku. Aby dołączyć nowo utworzony notes do właśnie utworzonego potoku:
- Kliknij pozycję Połącz w prawym górnym rogu, aby otworzyć menu konfiguracji obliczeniowej.
- Umieść kursor na nazwie potoku utworzonego w kroku 1.
- Kliknij Połącz.
Zmiany interfejsu użytkownika w celu uwzględnienia przycisków Weryfikuj i Uruchom w prawym górnym rogu. Aby dowiedzieć się więcej na temat obsługi notatników na potrzeby tworzenia kodu potoków, zapoznaj się z artykułem "Tworzenie i debugowanie potoków Delta Live Tables w notatnikach".
Ważne
- Potoki Delta Live Tables oceniają wszystkie komórki w notesie podczas planowania. W przeciwieństwie do notesów, które są uruchamiane względem obliczeń wszystkich celów lub zaplanowanych jako zadania, potoki nie gwarantują, że komórki działają w określonej kolejności.
- Notesy mogą zawierać tylko jeden język programowania. Nie mieszaj języka Python i kodu SQL w notesach kodu źródłowego potoku.
Aby uzyskać szczegółowe informacje na temat tworzenia kodu w języku Python lub SQL, zobacz Tworzenie kodu potoku przy użyciu języka Python lub Programowanie kodu potoku przy użyciu języka SQL.
Przykładowy kod potoku
Aby zaimplementować przykład w tym samouczku, skopiuj i wklej następujący kod do komórki w notesie skonfigurowanym jako kod źródłowy potoku.
Podany kod wykonuje następujące czynności:
- Importuje niezbędne moduły (tylko język Python).
- Odwołuje się do parametrów zdefiniowanych w trakcie konfiguracji potoku.
- Definiuje tabelę przesyłania strumieniowego o nazwie
baby_names_raw
pozyskiwaną z woluminu. - Definiuje zmaterializowany widok o nazwie
baby_names_prepared
, który weryfikuje pozyskane dane. - Definiuje zmaterializowany widok o nazwie
top_baby_names_2021
, który ma wysoce wyrafinowany widok danych.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
Krok 3: Rozpocznij aktualizację potoku
Aby rozpocząć aktualizację potoku, kliknij przycisk Uruchom w prawym górnym rogu interfejsu użytkownika notatnika.
Przykładowe notesy
Poniższe notesy zawierają te same przykłady kodu przedstawione w tym artykule. Te notesy mają te same wymagania co kroki opisane w tym artykule. Zobacz Wymagania.
Aby zaimportować notes, wykonaj następujące kroki:
- Otwórz interfejs użytkownika notesu.
- Kliknij pozycję + Nowy>notes.
- Zostanie otwarty pusty notes.
- Kliknij kolejno pozycje File (Plik)>Import (Importuj). Zostanie wyświetlone okno dialogowe Importowanie .
- Wybierz opcję adresu URL dla Import z.
- Wklej adres URL notesu.
- Kliknij przycisk Importuj.
Ten samouczek wymaga uruchomienia notesu przygotowania danych przed skonfigurowaniem i uruchomieniem potoku Delta Live Tables. Zaimportuj poniższy notes, dołącz notes do zasobu obliczeniowego, wypełnij wymaganą zmienną dla my_catalog
, my_schema
i my_volume
, a następnie kliknij pozycję Uruchom wszystko.
Samouczek pobierania danych dla potoków
Poniższe notesy zawierają przykłady w języku Python lub SQL. Podczas importowania notesu jest on zapisywany w katalogu głównym użytkownika.
Po zaimportowaniu jednego z poniższych notesów wykonaj kroki tworzenia potoku, ale użyj selektora plików kodu źródłowego , aby wybrać pobrany notes. Po utworzeniu potoku za pomocą notatnika skonfigurowanego jako kod źródłowy kliknij przycisk Uruchom w interfejsie użytkownika potoku, aby uruchomić aktualizację.