Udostępnij za pośrednictwem


Samouczek: Jak uruchomić pierwszy potok Delta Live Tables

Ten samouczek przeprowadzi Cię przez kroki konfigurowania pierwszego potoku Delta Live Tables, pisania podstawowego kodu ETL oraz uruchamiania aktualizacji potoku.

Wszystkie kroki opisane w tym samouczku są przeznaczone dla obszarów roboczych z włączonym Unity Catalog. Możesz również skonfigurować potoki Delta Live Tables do współpracy z legacy Hive metastore. Zobacz Użyj potoków Delta Live Tables z klasycznym metastore Hive.

Uwaga

Ten samouczek zawiera instrukcje dotyczące tworzenia i weryfikowania nowego kodu potoku przy użyciu notesów usługi Databricks. Potoki można również skonfigurować przy użyciu kodu źródłowego w plikach Python lub SQL.

Możesz skonfigurować potok tak, aby uruchamiał Twój kod, jeśli już posiadasz kod źródłowy napisany przy użyciu składni Delta Live Tables. Zobacz Konfigurowanie potoku Delta Live Tables.

Można używać w pełni deklaratywnej składni SQL w usłudze Databricks SQL do rejestrowania i ustawiania harmonogramów odświeżania dla zmaterializowanych widoków oraz tabel strumieniowych jako obiektów zarządzanych w ramach Unity Catalog. Zobacz Użyj widoków materializowanych w Databricks SQL oraz Załaduj dane przy użyciu strumieniowych tabel w Databricks SQL.

Przykład: pozyskiwanie i przetwarzanie danych dotyczących nazw dzieci w Nowym Jorku

W przykładzie w tym artykule użyto publicznie dostępnego zestawu danych zawierającego rekordy nowojorskich nazw dzieci. W tym przykładzie pokazano, jak korzystać z potoku Delta Live Tables, aby:

  • Odczytywanie nieprzetworzonych danych CSV z woluminu do tabeli.
  • Odczytaj rekordy z tabeli pozyskiwania i użyj tabel delta Live Tables oczekiwania, aby utworzyć nową tabelę zawierającą oczyszczone dane.
  • Użyj oczyszczonych rekordów jako danych wejściowych do zapytań delta Live Tables, które tworzą pochodne zestawy danych.

Ten kod przedstawia uproszczony przykład architektury medalonu. Zobacz Co to jest architektura medallion lakehouse?.

Implementacje tego przykładu są udostępniane dla języków Python i SQL. Wykonaj kroki, aby utworzyć nowy potok i notes, a następnie skopiuj podany kod.

Podano również przykładowe notesy z kompletnym kodem.

Wymagania

  • Aby uruchomić pipeline, musisz mieć uprawnienia do tworzenia klastra lub dostęp do polityki klastra definiującej klaster Delta Live Tables. Środowisko uruchomieniowe delta Live Tables tworzy klaster przed uruchomieniem potoku i kończy się niepowodzeniem, jeśli nie masz odpowiednich uprawnień.

  • Wszyscy użytkownicy mogą domyślnie wyzwalać aktualizacje przy użyciu potoków bezserwerowych. Bezserwerowa musi być włączona na poziomie konta i może nie być dostępna w regionie obszaru roboczego. Zobacz Włączanie przetwarzania bezserwerowego.

  • W przykładach w tym samouczku użyto Unity Catalog. Usługa Databricks zaleca utworzenie nowego schematu w celu uruchomienia tego samouczka, ponieważ wiele obiektów bazy danych jest tworzonych w schemacie docelowym.

    • Aby utworzyć nowy schemat w wykazie, musisz mieć uprawnienia ALL PRIVILEGES lub USE CATALOG i CREATE SCHEMA.
    • Jeśli nie możesz utworzyć nowego schematu, uruchom ten samouczek względem istniejącego schematu. Musisz mieć następujące uprawnienia:
      • USE CATALOG katalogu nadrzędnego.
      • ALL PRIVILEGES lub USE SCHEMA, CREATE MATERIALIZED VIEWoraz uprawnienia CREATE TABLE w schemacie docelowym.
    • W tym samouczku do przechowywania przykładowych danych jest używany wolumin. Usługa Databricks zaleca utworzenie nowego woluminu na potrzeby tego samouczka. Jeśli utworzysz nowy schemat dla tego samouczka, możesz utworzyć nowy wolumin w tym schemacie.
      • Aby utworzyć nowy wolumin w istniejącym schemacie, musisz mieć następujące uprawnienia:
        • USE CATALOG katalogu nadrzędnego.
        • ALL PRIVILEGES lub USE SCHEMA i uprawnienia CREATE VOLUME w schemacie docelowym.
      • Opcjonalnie możesz użyć istniejącego woluminu. Musisz mieć następujące uprawnienia:
        • USE CATALOG katalogu nadrzędnego.
        • USE SCHEMA schematu nadrzędnego.
        • ALL PRIVILEGES lub READ VOLUME i WRITE VOLUME na woluminie docelowym.

    Aby ustawić te uprawnienia, skontaktuj się z administratorem usługi Databricks. Aby uzyskać więcej informacji na temat uprawnień Unity Catalog, zobacz uprawnienia Unity Catalog i zabezpieczane obiekty.

Krok 0. Pobieranie danych

W tym przykładzie ładowane są dane z wolumenu Unity Catalog. Poniższy kod pobiera plik CSV i przechowuje go w określonym woluminie. Otwórz nowy notes i uruchom następujący kod, aby pobrać te dane do określonego woluminu:

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

Zastąp <catalog-name>, <schema-name>i <volume-name> nazwami katalogu, schematu i woluminu dla woluminu Unity Catalog. Podany kod próbuje utworzyć określony schemat i wolumin, jeśli te obiekty nie istnieją. Musisz mieć odpowiednie uprawnienia do tworzenia i zapisu obiektów w Unity Catalog. Zobacz Wymagania.

Uwaga

Przed kontynuowaniem pracy z samouczkiem upewnij się, że ten notes został pomyślnie uruchomiony. Nie należy konfigurować tego notesu w ramach potoku.

Krok 1. Tworzenie potoku

Delta Live Tables tworzy potoki przez rozwiązywanie zależności zdefiniowanych w notesach lub plikach (nazywanych kodem źródłowym) przy użyciu składni Delta Live Tables. Każdy plik kodu źródłowego może zawierać tylko jeden język, ale w potoku można dodać wiele notesów lub plików specyficznych dla języka.

Ważne

Nie należy konfigurować żadnych zasobów w polu Kod źródłowy. Pozostawienie tego pola czarnego powoduje utworzenie i skonfigurowanie notesu na potrzeby tworzenia kodu źródłowego.

Instrukcje w tym samouczku korzystają z bezserwerowego przetwarzania i Katalogu Unity. Użyj ustawień domyślnych dla wszystkich opcji konfiguracji, które nie zostały określone w tych instrukcjach.

Uwaga

Jeśli bezserwerowa nie jest włączona lub obsługiwana w obszarze roboczym, możesz ukończyć samouczek zgodnie z instrukcjami napisanymi przy użyciu domyślnych ustawień obliczeniowych. W interfejsie użytkownika Create pipeline, w sekcji Destination, należy ręcznie wybrać Unity Catalog w obszarze Storage options.

Aby skonfigurować nowy potok, wykonaj następujące czynności:

  1. Na pasku bocznym kliknij opcję Delta Live Tables.
  2. Kliknij Utwórz przepływ.
  3. W nazwa potokuwpisz unikatową nazwę potoku.
  4. Zaznacz pole wyboru Serverless.
  5. W Cel, aby skonfigurować lokalizację wykazu aparatu Unity, w której są publikowane tabele, wybierz Katalog i Schemat.
  6. W Zaawansowanekliknij opcję Dodaj konfigurację, a następnie zdefiniuj parametry potoku dla katalogu, schematu i woluminu, do którego pobrano dane przy użyciu następujących nazw parametrów:
    • my_catalog
    • my_schema
    • my_volume
  7. Kliknij pozycję Utwórz.

Interfejs użytkownika dla potoków pojawia się dla nowego potoku. Notes kodu źródłowego jest tworzony automatycznie i konfigurowany dla potoku.

Notes jest tworzony w nowym katalogu w katalogu użytkownika. Nazwa nowego katalogu i pliku jest zgodna z nazwą potoku. Na przykład /Users/your.username@databricks.com/my_pipeline/my_pipeline.

Link umożliwiający dostęp do tego notesu znajduje się w polu Kod źródłowy w panelu Szczegóły potoku. Kliknij link, aby otworzyć notes przed przejściem do następnego kroku.

Krok 2. Deklarowanie zmaterializowanych widoków i tabel przesyłania strumieniowego w notesie przy użyciu języka Python lub SQL

Notesy usługi Databricks umożliwiają interaktywne opracowywanie i weryfikowanie kodu źródłowego dla potoków Delta Live Tables. Aby korzystać z tej funkcji, musisz dołączyć notes do potoku. Aby dołączyć nowo utworzony notes do właśnie utworzonego potoku:

  1. Kliknij pozycję Połącz w prawym górnym rogu, aby otworzyć menu konfiguracji obliczeniowej.
  2. Umieść kursor na nazwie potoku utworzonego w kroku 1.
  3. Kliknij Połącz.

Zmiany interfejsu użytkownika w celu uwzględnienia przycisków Weryfikuj i Uruchom w prawym górnym rogu. Aby dowiedzieć się więcej na temat obsługi notatników na potrzeby tworzenia kodu potoków, zapoznaj się z artykułem "Tworzenie i debugowanie potoków Delta Live Tables w notatnikach".

Ważne

  • Potoki Delta Live Tables oceniają wszystkie komórki w notesie podczas planowania. W przeciwieństwie do notesów, które są uruchamiane względem obliczeń wszystkich celów lub zaplanowanych jako zadania, potoki nie gwarantują, że komórki działają w określonej kolejności.
  • Notesy mogą zawierać tylko jeden język programowania. Nie mieszaj języka Python i kodu SQL w notesach kodu źródłowego potoku.

Aby uzyskać szczegółowe informacje na temat tworzenia kodu w języku Python lub SQL, zobacz Tworzenie kodu potoku przy użyciu języka Python lub Programowanie kodu potoku przy użyciu języka SQL.

Przykładowy kod potoku

Aby zaimplementować przykład w tym samouczku, skopiuj i wklej następujący kod do komórki w notesie skonfigurowanym jako kod źródłowy potoku.

Podany kod wykonuje następujące czynności:

  • Importuje niezbędne moduły (tylko język Python).
  • Odwołuje się do parametrów zdefiniowanych w trakcie konfiguracji potoku.
  • Definiuje tabelę przesyłania strumieniowego o nazwie baby_names_raw pozyskiwaną z woluminu.
  • Definiuje zmaterializowany widok o nazwie baby_names_prepared , który weryfikuje pozyskane dane.
  • Definiuje zmaterializowany widok o nazwie top_baby_names_2021 , który ma wysoce wyrafinowany widok danych.

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Krok 3: Rozpocznij aktualizację potoku

Aby rozpocząć aktualizację potoku, kliknij przycisk Uruchom w prawym górnym rogu interfejsu użytkownika notatnika.

Przykładowe notesy

Poniższe notesy zawierają te same przykłady kodu przedstawione w tym artykule. Te notesy mają te same wymagania co kroki opisane w tym artykule. Zobacz Wymagania.

Aby zaimportować notes, wykonaj następujące kroki:

  1. Otwórz interfejs użytkownika notesu.
    • Kliknij pozycję + Nowy>notes.
    • Zostanie otwarty pusty notes.
  2. Kliknij kolejno pozycje File (Plik)>Import (Importuj). Zostanie wyświetlone okno dialogowe Importowanie .
  3. Wybierz opcję adresu URL dla Import z.
  4. Wklej adres URL notesu.
  5. Kliknij przycisk Importuj.

Ten samouczek wymaga uruchomienia notesu przygotowania danych przed skonfigurowaniem i uruchomieniem potoku Delta Live Tables. Zaimportuj poniższy notes, dołącz notes do zasobu obliczeniowego, wypełnij wymaganą zmienną dla my_catalog, my_schemai my_volume, a następnie kliknij pozycję Uruchom wszystko.

Samouczek pobierania danych dla potoków

Weź notes

Poniższe notesy zawierają przykłady w języku Python lub SQL. Podczas importowania notesu jest on zapisywany w katalogu głównym użytkownika.

Po zaimportowaniu jednego z poniższych notesów wykonaj kroki tworzenia potoku, ale użyj selektora plików kodu źródłowego , aby wybrać pobrany notes. Po utworzeniu potoku za pomocą notatnika skonfigurowanego jako kod źródłowy kliknij przycisk Uruchom w interfejsie użytkownika potoku, aby uruchomić aktualizację.

Rozpocznij pracę z notebookiem Delta Live Tables w Pythonie

Weź notes

Rozpocznij pracę z notesem SQL usługi Delta Live Tables

Weź notes

Dodatkowe zasoby