Udostępnij za pośrednictwem


Opracowywanie kodu potoku przy użyciu języka Python

Funkcja Delta Live Tables wprowadza kilka nowych konstrukcji kodu języka Python do definiowania zmaterializowanych widoków i tabel przesyłania strumieniowego w potokach. Obsługa języka Python na potrzeby tworzenia potoków opiera się na podstawach ramek danych PySpark i interfejsów API przesyłania strumieniowego ze strukturą.

W przypadku użytkowników niezaznajomionych z językami Python i DataFrame usługa Databricks zaleca korzystanie z interfejsu SQL. Zobacz Tworzenie kodu potoku przy użyciu języka SQL.

Aby uzyskać pełną dokumentację składni języka Python tabel delta live tables, zobacz Delta Live Tables Python language reference (Dokumentacja języka Python tabel na żywo funkcji Delta Live Tables).

Podstawy tworzenia potoków w języku Python

Kod języka Python, który tworzy zestawy danych delta Live Tables, musi zwracać ramki danych.

Wszystkie interfejsy API języka Python tabel delta live tables są implementowane w module dlt . Kod potoku delta Live Tables zaimplementowany za pomocą języka Python musi jawnie zaimportować dlt moduł w górnej części notesów i plików języka Python.

Kod języka Python specyficzny dla tabel na żywo funkcji delta różni się od innych typów kodu języka Python w jeden krytyczny sposób: kod potoku języka Python nie wywołuje bezpośrednio funkcji wykonujących pozyskiwanie i przekształcanie danych w celu utworzenia zestawów danych delta Live Tables. Zamiast tego funkcja Delta Live Tables interpretuje funkcje dekoratora z modułu dlt we wszystkich plikach kodu źródłowego skonfigurowanych w potoku i tworzy graf przepływu danych.

Ważne

Aby uniknąć nieoczekiwanego zachowania podczas uruchamiania potoku, nie dołączaj kodu, który może mieć skutki uboczne w funkcjach definiujących zestawy danych. Aby dowiedzieć się więcej, zobacz dokumentację języka Python.

Tworzenie zmaterializowanego widoku lub tabeli przesyłania strumieniowego przy użyciu języka Python

Dekorator @dlt.table nakazuje funkcji Delta Live Tables utworzenie zmaterializowanego widoku lub tabeli przesyłania strumieniowego na podstawie wyników zwracanych przez funkcję. Wyniki odczytu wsadowego tworzą zmaterializowany widok, a wyniki odczytu przesyłania strumieniowego tworzą tabelę przesyłania strumieniowego.

Domyślnie zmaterializowane nazwy tabel widoku i przesyłania strumieniowego są wnioskowane z nazw funkcji. Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku i tabeli przesyłania strumieniowego:

Uwaga

Obie funkcje odwołują się do tej samej tabeli w wykazie samples i używają tej samej funkcji dekoratora. Te przykłady podkreślają, że jedyną różnicą w podstawowej składni zmaterializowanych widoków i tabel przesyłania strumieniowego jest użycie wartości w porównaniu z spark.read spark.readStream.

Nie wszystkie źródła danych obsługują odczyty przesyłane strumieniowo. Niektóre źródła danych powinny być zawsze przetwarzane za pomocą semantyki przesyłania strumieniowego.

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Opcjonalnie możesz określić nazwę tabeli przy użyciu argumentu name w dekoratorze @dlt.table . W poniższym przykładzie pokazano ten wzorzec dla zmaterializowanego widoku i tabeli przesyłania strumieniowego:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

Ładowanie danych z magazynu obiektów

Usługa Delta Live Tables obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.

Uwaga

W tych przykładach używane są dane dostępne w obszarze /databricks-datasets automatycznie zainstalowanym w obszarze roboczym. Usługa Databricks zaleca używanie ścieżek woluminów lub identyfikatorów URI w chmurze w celu odwołowania się do danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to są woluminy wykazu aparatu Unity?.

Usługa Databricks zaleca używanie tabel automatycznego modułu ładującego i przesyłania strumieniowego podczas konfigurowania obciążeń pozyskiwania przyrostowego względem danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to jest moduł automatycznego ładowania?.

W poniższym przykładzie tworzona jest tabela przesyłania strumieniowego z plików JSON przy użyciu modułu automatycznego ładującego:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

Weryfikowanie danych z oczekiwaniami

Możesz użyć oczekiwań, aby ustawić i wymusić ograniczenia dotyczące jakości danych. Zobacz Zarządzanie jakością danych za pomocą tabel delta live.

Poniższy kod używa @dlt.expect_or_drop metody do zdefiniowania oczekiwania o nazwie valid_data , która odrzuca rekordy, które mają wartość null podczas pozyskiwania danych:

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

Wykonywanie zapytań dotyczących zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku

Użyj schematu LIVE , aby wykonywać zapytania dotyczące innych zmaterializowanych widoków i tabel przesyłania strumieniowego zdefiniowanych w potoku.

W poniższym przykładzie zdefiniowano cztery zestawy danych:

  • Tabela przesyłania strumieniowego o nazwie orders , która ładuje dane JSON.
  • Zmaterializowany widok o nazwie customers , który ładuje dane CSV.
  • Zmaterializowany widok o nazwie customer_orders , który łączy rekordy z orders zestawów danych i customers , rzutuje znacznik czasu zamówienia na datę i wybiera customer_idpola , order_number, statei order_date .
  • Zmaterializowany widok o nazwie daily_orders_by_state , który agreguje dzienną liczbę zamówień dla każdego stanu.
import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("LIVE.orders")
    .join(spark.read.table("LIVE.customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("LIVE.customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

Tworzenie tabel w for pętli

Pętle języka Python for umożliwiają programowe tworzenie wielu tabel. Może to być przydatne, gdy masz wiele źródeł danych lub docelowych zestawów danych, które różnią się tylko o kilka parametrów, co powoduje zmniejszenie całkowitej liczby kodu w celu zachowania nadmiarowości kodu i mniejszej nadmiarowości kodu.

Pętla for ocenia logikę w kolejności szeregowej, ale po zakończeniu planowania dla zestawów danych logika przebiegów potoku jest równoległa.

Ważne

W przypadku używania tego wzorca do definiowania zestawów danych upewnij się, że lista wartości przekazanych do for pętli jest zawsze dodawalna. Jeśli zestaw danych zdefiniowany wcześniej w potoku zostanie pominięty z przyszłego uruchomienia potoku, zestaw danych zostanie usunięty automatycznie ze schematu docelowego.

Poniższy przykład tworzy pięć tabel filtrujących zamówienia klientów według regionów. W tym miejscu nazwa regionu służy do ustawiania nazwy zmaterializowanych widoków docelowych i filtrowania danych źródłowych. Widoki tymczasowe służą do definiowania sprzężeń z tabel źródłowych używanych w konstruowaniu końcowych zmaterializowanych widoków.

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("LIVE.customer_orders")
    nation_region = spark.read.table("LIVE.nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

Poniżej przedstawiono przykład wykresu przepływu danych dla tego potoku:

Wykres przepływu danych dwóch widoków prowadzących do pięciu tabel regionalnych.

Rozwiązywanie problemów: for pętla tworzy wiele tabel z tymi samymi wartościami

Model wykonywania z opóźnieniem używany przez potoki do oceny kodu języka Python wymaga, aby logika bezpośrednio odwoływała się do poszczególnych wartości po wywołaniu funkcji ozdobionej przez @dlt.table() .

W poniższym przykładzie przedstawiono dwa poprawne podejścia do definiowania tabel z pętlą for . W obu przykładach każda nazwa tabeli z tables listy jest jawnie przywołyowana w funkcji ozdobionej przez @dlt.table()element .

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

Poniższy przykład nie odwołuje się poprawnie do wartości. W tym przykładzie są tworzone tabele o różnych nazwach, ale wszystkie tabele ładują dane z ostatniej wartości w for pętli:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)