Udostępnij za pośrednictwem


Opracowywanie kodu potoku przy użyciu języka Python

Delta Live Tables wprowadza kilka nowych konstrukcji kodu w języku Python do definiowania zmaterializowanych widoków i tabel strumieniowych w potokach. Obsługa języka Python na potrzeby tworzenia potoków opiera się na podstawach ramek danych PySpark i interfejsów API przesyłania strumieniowego ze strukturą.

W przypadku użytkowników niezaznajomionych z językami Python i DataFrame usługa Databricks zaleca korzystanie z interfejsu SQL. Zobacz Tworzenie kodu potoku przy użyciu języka SQL.

Aby uzyskać pełną dokumentację składni języka Python dla Delta Live Tables, zobacz Delta Live Tables Python language reference.

Podstawy tworzenia potoków w języku Python

Kod języka Python, który tworzy zestawy danych delta Live Tables, musi zwracać ramki danych.

Wszystkie interfejsy API Pythona dla Delta Live Tables są implementowane w module dlt. Kod potoku delta Live Tables zaimplementowany za pomocą języka Python musi jawnie zaimportować moduł dlt w górnej części notesów i plików języka Python.

Odczyty i zapisy domyślnie wykorzystują katalog i schemat określony podczas konfiguracji potoku. Zobacz Ustaw katalog docelowy i schemat.

Kod Pythona specyficzny dla Delta Live Tables różni się od innych typów kodu Pythona w jeden kluczowy sposób: kod potoku Pythona nie wywołuje bezpośrednio funkcji, które wykonują pozyskiwanie i przekształcanie danych w celu utworzenia zestawów danych Delta Live Tables. Zamiast tego funkcja Delta Live Tables interpretuje funkcje dekoratora z modułu dlt we wszystkich plikach kodu źródłowego skonfigurowanych w potoku i tworzy graf przepływu danych.

Ważne

Aby uniknąć nieoczekiwanego zachowania podczas uruchamiania potoku, nie dołączaj kodu, który może mieć skutki uboczne w funkcjach definiujących zestawy danych. Aby dowiedzieć się więcej, zobacz dokumentację języka Python.

Tworzenie zmaterializowanego widoku lub tabeli przesyłania strumieniowego przy użyciu języka Python

Dekorator @dlt.table nakazuje funkcji Delta Live Tables utworzenie zmaterializowanego widoku lub tabeli przesyłania strumieniowego na podstawie wyników zwracanych przez funkcję. Wyniki odczytu wsadowego tworzą zmaterializowany widok, natomiast wyniki odczytu strumieniowego tworzą tabelę strumieniową.

Domyślnie nazwy zmaterializowanego widoku i tabeli strumieniowej są wnioskowane z nazw funkcji. Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku i tabeli strumieniowej:

Uwaga

Obie funkcje odwołują się do tej samej tabeli w wykazie samples i używają tej samej funkcji dekoratora. Te przykłady podkreślają, że jedyną różnicą w podstawowej składni zmaterializowanych widoków i tabel przesyłania strumieniowego jest użycie spark.read versus spark.readStream.

Nie wszystkie źródła danych obsługują odczyty przesyłane strumieniowo. Niektóre źródła danych powinny być zawsze przetwarzane za pomocą semantyki przesyłania strumieniowego.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Opcjonalnie możesz określić nazwę tabeli przy użyciu argumentu name w dekoratorze @dlt.table. W poniższym przykładzie pokazano ten wzorzec dla zmaterializowanego widoku i tabeli strumieniowej.

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Ładowanie danych z magazynu obiektów

Usługa Delta Live Tables obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.

Uwaga

W tych przykładach używane są dane dostępne w obszarze /databricks-datasets automatycznie zainstalowanym w obszarze roboczym. Usługa Databricks zaleca używanie ścieżek woluminów lub identyfikatorów URI w chmurze w celu odwołowania się do danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to są woluminy Unity Catalog?.

Usługa Databricks zaleca używanie Auto Loader i tabel strumieniowych podczas konfigurowania przetwarzań przyrostowego pozyskiwania danych względem danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to jest moduł automatycznego ładowania?.

W poniższym przykładzie tworzona jest tabela strumieniowa z plików JSON przy użyciu Auto Loader.

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Weryfikowanie danych z oczekiwaniami

Możesz użyć oczekiwań, aby ustawić i wymusić ograniczenia dotyczące jakości danych. Zobacz Zarządzaj jakością danych z oczekiwaniami pipeline'u.

Poniższy kod używa @dlt.expect_or_drop metody do zdefiniowania oczekiwania o nazwie valid_data , która odrzuca rekordy, które mają wartość null podczas pozyskiwania danych:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Wykonywanie zapytań dotyczących zmaterializowanych widoków i tabel strumieniowych zdefiniowanych w potoku

W poniższym przykładzie zdefiniowano cztery zestawy danych:

  • Tabela strumieniowa o nazwie orders, która ładuje dane JSON.
  • Zmaterializowany widok o nazwie customers , który ładuje dane CSV.
  • Zmaterializowany widok o nazwie customer_orders , który łączy rekordy z orders zestawów danych i customers , rzutuje znacznik czasu zamówienia na datę i wybiera customer_idpola , order_number, statei order_date .
  • Zmaterializowany widok o nazwie daily_orders_by_state , który agreguje dzienną liczbę zamówień dla każdego stanu.

Uwaga

Podczas wykonywania zapytań dotyczących widoków lub tabel w potoku można bezpośrednio określić katalog i schemat albo użyć domyślnych wartości skonfigurowanych w potoku. W tym przykładzie tabele orders, customersi customer_orders są zapisywane i odczytywane z domyślnego katalogu i schematu skonfigurowanego dla twojego potoku.

Tradycyjny tryb publikowania używa schematu LIVE do wykonywania zapytań dotyczących innych zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku. W nowych potokach składnia schematu LIVE jest ignorowana w trybie dyskretnym. Zobacz schemat na żywo (starsza wersja).

import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Twórz tabele w pętli for

Za pomocą pętli for języka Python można programowo tworzyć wiele tabel. Może to być przydatne, gdy masz wiele źródeł danych lub docelowych zestawów danych, które różnią się tylko o kilka parametrów, co zmniejsza ilość kodu do utrzymania i redukuje jego redundancję.

Pętla for ocenia logikę w kolejności szeregowej, ale po zakończeniu planowania dla zestawów danych logika przebiegów potoku jest równoległa.

Ważne

W przypadku używania tego wzorca do definiowania zestawów danych upewnij się, że lista wartości przekazanych do pętli for jest zawsze dodawalna. Jeśli zestaw danych zdefiniowany wcześniej w potoku zostanie pominięty w przyszłym uruchomieniu potoku, zostanie on usunięty automatycznie ze schematu docelowego.

Poniższy przykład tworzy pięć tabel filtrujących zamówienia klientów według regionów. W tym miejscu nazwa regionu służy do ustawiania nazwy zmaterializowanych widoków docelowych i filtrowania danych źródłowych. Widoki tymczasowe służą do definiowania sprzężeń z tabel źródłowych używanych w konstruowaniu końcowych zmaterializowanych widoków.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Poniżej przedstawiono przykład wykresu przepływu danych dla tego potoku:

Wykres przepływu danych dwóch widoków prowadzących do pięciu tabel regionalnych.

Rozwiązywanie problemów: pętla for tworzy wiele tabel z tymi samymi wartościami

Model wykonywania z opóźnieniem używany przez potoki do oceny kodu języka Python wymaga, aby logika bezpośrednio odwoływała się do poszczególnych wartości po wywołaniu funkcji ozdobionej przez @dlt.table().

W poniższym przykładzie pokazano dwa poprawne podejścia do definiowania tabel z pętlą for. W obu przykładach każda nazwa tabeli z listy tables jest jawnie przywołyowana w funkcji ozdobionej przez @dlt.table().

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

Poniższy przykład nie poprawnie odwoływać się do wartości. W tym przykładzie są tworzone tabele o różnych nazwach, ale wszystkie tabele ładują dane z ostatniej wartości w pętli for:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)