Samouczek: Uruchamianie pierwszego potoku Delta Live Tables
Ten samouczek przeprowadzi Cię przez kroki konfigurowania pierwszego potoku delta Live Tables, pisania podstawowego kodu ETL i uruchamiania aktualizacji potoku.
Wszystkie kroki opisane w tym samouczku są przeznaczone dla obszarów roboczych z włączonym katalogiem aparatu Unity. Możesz również skonfigurować potoki delta Live Tables do pracy ze starszym magazynem metadanych Hive. Zobacz Używanie potoków tabel delta live z starszym magazynem metadanych Hive.
Uwaga
Ten samouczek zawiera instrukcje dotyczące tworzenia i weryfikowania nowego kodu potoku przy użyciu notesów usługi Databricks. Potoki można również skonfigurować przy użyciu kodu źródłowego w plikach Python lub SQL.
Potok można skonfigurować tak, aby uruchamiał kod, jeśli masz już kod źródłowy napisany przy użyciu składni delta Live Tables. Zobacz Konfigurowanie potoku tabel na żywo delty.
Można użyć w pełni deklaratywnej składni SQL w usłudze Databricks SQL do rejestrowania i ustawiania harmonogramów odświeżania dla zmaterializowanych widoków i tabel przesyłania strumieniowego jako obiektów zarządzanych przez wykaz aparatu Unity. Zobacz Używanie zmaterializowanych widoków w usłudze Databricks SQL i Ładowanie danych przy użyciu tabel przesyłania strumieniowego w usłudze Databricks SQL.
Przykład: pozyskiwanie i przetwarzanie danych dotyczących nazw dzieci w Nowym Jorku
W przykładzie w tym artykule użyto publicznie dostępnego zestawu danych zawierającego rekordy nowojorskich nazw dzieci. W tym przykładzie pokazano użycie potoku delta live tables do:
- Odczytywanie nieprzetworzonych danych CSV z woluminu do tabeli.
- Odczytaj rekordy z tabeli pozyskiwania i użyj oczekiwań delta Live Tables, aby utworzyć nową tabelę zawierającą oczyszczone dane.
- Użyj oczyszczonych rekordów jako danych wejściowych do zapytań delta Live Tables, które tworzą pochodne zestawy danych.
Ten kod przedstawia uproszczony przykład architektury medalonu. Zobacz Co to jest architektura medallion lakehouse?.
Implementacje tego przykładu są udostępniane dla języków Python i SQL. Wykonaj kroki, aby utworzyć nowy potok i notes, a następnie skopiuj podany kod.
Podano również przykładowe notesy z kompletnym kodem.
Wymagania
Aby uruchomić potok, musisz mieć uprawnienie do tworzenia klastra lub dostęp do zasad klastra definiujących klaster delta live tables. Środowisko uruchomieniowe delta Live Tables tworzy klaster przed uruchomieniem potoku i kończy się niepowodzeniem, jeśli nie masz odpowiednich uprawnień.
Wszyscy użytkownicy mogą domyślnie wyzwalać aktualizacje przy użyciu potoków bezserwerowych. Bezserwerowa musi być włączona na poziomie konta i może nie być dostępna w regionie obszaru roboczego. Zobacz Włączanie przetwarzania bezserwerowego.
Przykłady w tym samouczku korzystają z wykazu aparatu Unity. Usługa Databricks zaleca utworzenie nowego schematu w celu uruchomienia tego samouczka, ponieważ wiele obiektów bazy danych jest tworzonych w schemacie docelowym.
- Aby utworzyć nowy schemat w wykazie, musisz mieć
ALL PRIVILEGES
uprawnienia lubUSE CATALOG
iCREATE SCHEMA
. - Jeśli nie możesz utworzyć nowego schematu, uruchom ten samouczek względem istniejącego schematu. Musisz mieć następujące uprawnienia:
USE CATALOG
dla wykazu nadrzędnego.ALL PRIVILEGES
lubUSE SCHEMA
,CREATE MATERIALIZED VIEW
iCREATE TABLE
uprawnienia w schemacie docelowym.
- W tym samouczku do przechowywania przykładowych danych jest używany wolumin. Usługa Databricks zaleca utworzenie nowego woluminu na potrzeby tego samouczka. Jeśli utworzysz nowy schemat dla tego samouczka, możesz utworzyć nowy wolumin w tym schemacie.
- Aby utworzyć nowy wolumin w istniejącym schemacie, musisz mieć następujące uprawnienia:
USE CATALOG
dla wykazu nadrzędnego.ALL PRIVILEGES
lubUSE SCHEMA
iCREATE VOLUME
uprawnienia w schemacie docelowym.
- Opcjonalnie możesz użyć istniejącego woluminu. Musisz mieć następujące uprawnienia:
USE CATALOG
dla wykazu nadrzędnego.USE SCHEMA
dla schematu nadrzędnego.ALL PRIVILEGES
lubREAD VOLUME
iWRITE VOLUME
na woluminie docelowym.
- Aby utworzyć nowy wolumin w istniejącym schemacie, musisz mieć następujące uprawnienia:
Aby ustawić te uprawnienia, skontaktuj się z administratorem usługi Databricks. Aby uzyskać więcej informacji na temat uprawnień wykazu aparatu Unity, zobacz Uprawnienia wykazu aparatu Unity i zabezpieczane obiekty.
- Aby utworzyć nowy schemat w wykazie, musisz mieć
Krok 0. Pobieranie danych
W tym przykładzie dane są ładowane z woluminu wykazu aparatu Unity. Poniższy kod pobiera plik CSV i przechowuje go w określonym woluminie. Otwórz nowy notes i uruchom następujący kod, aby pobrać te dane do określonego woluminu:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
Zastąp <catalog-name>
wartości , <schema-name>
i <volume-name>
nazwami wykazu, schematu i woluminu dla woluminu wykazu aparatu Unity. Podany kod próbuje utworzyć określony schemat i wolumin, jeśli te obiekty nie istnieją. Musisz mieć odpowiednie uprawnienia do tworzenia i zapisywania obiektów w wykazie aparatu Unity. Zobacz Wymagania.
Uwaga
Przed kontynuowaniem pracy z samouczkiem upewnij się, że ten notes został pomyślnie uruchomiony. Nie należy konfigurować tego notesu w ramach potoku.
Krok 1. Tworzenie potoku
Delta Live Tables tworzy potoki, rozwiązując zależności zdefiniowane w notesach lub plikach (nazywanym kodem źródłowym) przy użyciu składni delta Live Tables. Każdy plik kodu źródłowego może zawierać tylko jeden język, ale w potoku można dodać wiele notesów lub plików specyficznych dla języka.
Ważne
Nie należy konfigurować żadnych zasobów w polu Kod źródłowy. Pozostawienie tego pola czarnego powoduje utworzenie i skonfigurowanie notesu na potrzeby tworzenia kodu źródłowego.
Instrukcje w tym samouczku korzystają z bezserwerowych zasobów obliczeniowych i wykazu aparatu Unity. Użyj ustawień domyślnych dla wszystkich opcji konfiguracji, które nie zostały wymienione w tych instrukcjach.
Uwaga
Jeśli bezserwerowa nie jest włączona lub obsługiwana w obszarze roboczym, możesz ukończyć samouczek zgodnie z instrukcjami napisanymi przy użyciu domyślnych ustawień obliczeniowych. Musisz ręcznie wybrać pozycję Wykaz aparatu Unity w obszarze Opcje magazynu w sekcji Miejsce docelowe w interfejsie użytkownika tworzenia potoku .
Aby skonfigurować nowy potok, wykonaj następujące czynności:
- Kliknij pozycję Delta Live Tables (Tabele na żywo funkcji Delta) na pasku bocznym.
- Kliknij pozycję Utwórz potok.
- Podaj unikatową nazwę potoku.
- Zaznacz pole wyboru obok pozycji Bezserwerowe.
- Wybierz katalog, aby opublikować dane.
- Wybierz schemat w wykazie.
- Określ nową nazwę schematu, aby utworzyć schemat.
- Zdefiniuj trzy parametry potoku przy użyciu przycisku Dodaj konfigurację w obszarze Zaawansowane, aby dodać trzy konfiguracje. Określ katalog, schemat i wolumin, do którego pobrano dane, używając następujących nazw parametrów:
my_catalog
my_schema
my_volume
- Kliknij pozycję Utwórz.
Interfejs użytkownika potoków zostanie wyświetlony dla nowo utworzonego potoku. Notes kodu źródłowego jest tworzony automatycznie i konfigurowany dla potoku.
Notes jest tworzony w nowym katalogu w katalogu użytkownika. Nazwa nowego katalogu i pliku jest zgodna z nazwą potoku. Na przykład /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
Link umożliwiający dostęp do tego notesu znajduje się w polu Kod źródłowy w panelu Szczegóły potoku. Kliknij link, aby otworzyć notes przed przejściem do następnego kroku.
Krok 2. Deklarowanie zmaterializowanych widoków i tabel przesyłania strumieniowego w notesie przy użyciu języka Python lub SQL
Notesy usługi Datbricks umożliwiają interaktywne opracowywanie i weryfikowanie kodu źródłowego dla potoków tabel delta Live Tables. Aby korzystać z tej funkcji, musisz dołączyć notes do potoku. Aby dołączyć nowo utworzony notes do właśnie utworzonego potoku:
- Kliknij pozycję Połącz w prawym górnym rogu, aby otworzyć menu konfiguracji obliczeniowej.
- Umieść kursor na nazwie potoku utworzonego w kroku 1.
- Kliknij Połącz.
Zmiany interfejsu użytkownika w celu uwzględnienia przycisków Weryfikuj i Uruchom w prawym górnym rogu. Aby dowiedzieć się więcej na temat obsługi notesu na potrzeby tworzenia kodu potoku, zobacz Tworzenie i debugowanie potoków tabel delta live tables w notesach.
Ważne
- Potoki delta Live Tables oceniają wszystkie komórki w notesie podczas planowania. W przeciwieństwie do notesów, które są uruchamiane względem obliczeń wszystkich celów lub zaplanowanych jako zadania, potoki nie gwarantują, że komórki działają w określonej kolejności.
- Notesy mogą zawierać tylko jeden język programowania. Nie mieszaj języka Python i kodu SQL w notesach kodu źródłowego potoku.
Aby uzyskać szczegółowe informacje na temat tworzenia kodu w języku Python lub SQL, zobacz Tworzenie kodu potoku przy użyciu języka Python lub Programowanie kodu potoku przy użyciu języka SQL.
Przykładowy kod potoku
Aby zaimplementować przykład w tym samouczku, skopiuj i wklej następujący kod do komórki w notesie skonfigurowanym jako kod źródłowy potoku.
Podany kod wykonuje następujące czynności:
- Importuje niezbędne moduły (tylko język Python).
- Odwołuje się do parametrów zdefiniowanych podczas konfiguracji potoku.
- Definiuje tabelę przesyłania strumieniowego o nazwie
baby_names_raw
, która pozysuje z woluminu. - Definiuje zmaterializowany widok o nazwie
baby_names_prepared
, który weryfikuje pozyskane dane. - Definiuje zmaterializowany widok o nazwie
top_baby_names_2021
, który ma wysoce wyrafinowany widok danych.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
Krok 3. Uruchamianie aktualizacji potoku
Aby rozpocząć aktualizację potoku, kliknij przycisk Start w prawym górnym rogu interfejsu użytkownika notesu.
Przykładowe notesy
Poniższe notesy zawierają te same przykłady kodu przedstawione w tym artykule. Te notesy mają te same wymagania co kroki opisane w tym artykule. Zobacz Wymagania.
Aby zaimportować notes, wykonaj następujące kroki:
- Otwórz interfejs użytkownika notesu.
- Kliknij pozycję + Nowy>notes.
- Zostanie otwarty pusty notes.
- Kliknij kolejno pozycje File (Plik)>Import (Importuj). Zostanie wyświetlone okno dialogowe Importowanie .
- Wybierz opcję Adres URL importu z.
- Wklej adres URL notesu.
- Kliknij przycisk Importuj.
Ten samouczek wymaga uruchomienia notesu konfiguracji danych przed skonfigurowaniem i uruchomieniem potoku delta Live Tables. Zaimportuj poniższy notes, dołącz notes do zasobu obliczeniowego, wypełnij wymaganą zmienną dla my_catalog
, my_schema
i my_volume
, a następnie kliknij pozycję Uruchom wszystko.
Samouczek pobierania danych dla potoków
Poniższe notesy zawierają przykłady w języku Python lub SQL. Podczas importowania notesu jest on zapisywany w katalogu głównym użytkownika.
Po zaimportowaniu jednego z poniższych notesów wykonaj kroki tworzenia potoku, ale użyj selektora pliku kodu źródłowego, aby wybrać pobrany notes. Po utworzeniu potoku za pomocą notesu skonfigurowanego jako kod źródłowy kliknij przycisk Start w interfejsie użytkownika potoku, aby wyzwolić aktualizację.