Sdílet prostřednictvím


Transformace dat pomocí dynamických tabulek Delta

Tento článek popisuje, jak můžete pomocí Delta Live Tables deklarovat transformace datasetů a určit způsob zpracování záznamů logikou dotazu. Obsahuje také příklady běžných transformačních vzorů pro vytváření potrubí Delta Live Tables.

Datovou sadu můžete definovat pro jakýkoli dotaz, který vrací datový rámec. V kanálu Delta Live Tables můžete jako transformace použít integrované operace Apache Sparku, funkce definované uživatelem, vlastní logiku a modely MLflow. Po ingestování dat do kanálu Delta Live Tables můžete definovat nové datové sady vzhledem k upstream zdrojům a vytvářet nové streamované tabulky, materializovaná zobrazení a běžná zobrazení.

Informace o efektivním provádění stavového zpracování pomocí Delta Live Tables najdete v tématu Optimalizace stavového zpracování v Delta Live Tables pomocí vodítek.

Kdy použít zobrazení, materializovaná zobrazení a streamované tabulky

Při implementaci dotazů kanálu zvolte nejlepší typ datové sady, abyste zajistili, že jsou efektivní a udržovatelné.

Zvažte použití zobrazení k provedení následujících kroků:

  • Rozdělte velký nebo složitý dotaz, který chcete použít k jednodušší správě dotazů.
  • Ověřte průběžné výsledky pomocí očekávání.
  • Snižte náklady na úložiště a výpočetní prostředky pro výsledky, které nemusíte uchovávat. Vzhledem k tomu, že tabulky jsou materializované, vyžadují další výpočetní prostředky a prostředky úložiště.

Zvažte použití materializovaného zobrazení v následujících případech:

  • Tabulku spotřebovávají více podřízených dotazů. Vzhledem k tomu, že zobrazení se počítají na vyžádání, je zobrazení znovu vypočteno při každém dotazování zobrazení.
  • Tabulku spotřebovávají jiné kanály, úlohy nebo dotazy. Vzhledem k tomu, že zobrazení nejsou materializovaná, můžete je použít pouze ve stejném kanálu.
  • Chcete zobrazit výsledky dotazu během vývoje. Vzhledem k tomu, že tabulky jsou materializované a dají se zobrazit a dotazovat mimo kanál, může použití tabulek během vývoje pomoct ověřit správnost výpočtů. Po ověření převeďte dotazy, které nevyžadují materializaci do zobrazení.

Zvažte použití streamované tabulky v následujících případech:

  • Dotaz je definován proti zdroji dat, který se nepřetržitě nebo přírůstkově zvětšuje.
  • Výsledky dotazu by se měly vypočítat přírůstkově.
  • Kanál potřebuje vysokou propustnost a nízkou latenci.

Poznámka:

Streamované tabulky jsou vždy definovány proti zdrojům streamování. K instalaci aktualizací z informačních kanálů CDC můžete použít také zdroje APPLY CHANGES INTO streamování. Viz rozhraní API pro použití změn: Zjednodušte zachycování změnových dat pomocí Delta Live Tables.

Vyloučení tabulek z cílového schématu

Pokud je nutné vypočítat zprostředkující tabulky, které nejsou určené pro externí spotřebu, můžete zabránit jejich publikování do schématu pomocí klíčového slova TEMPORARY. Dočasné tabulky stále ukládají a zpracovávají data podle sémantiky Delta Live Tables, ale neměly by být přístupné mimo aktuální kanál. Dočasná tabulka zůstane po celou dobu životnosti datového toku, který ji vytvoří. K deklaraci dočasných tabulek použijte následující syntaxi:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Python

@dlt.table(
  temporary=True)
def temp_table():
  return ("...")

Kombinování streamovaných tabulek a materializovaných zobrazení v jednom kanálu

Streamované tabulky dědí záruky zpracování Strukturovaného Streamování Apache Spark a jsou nakonfigurované tak, aby zpracovávaly dotazy z přidávacích zdrojů dat, kde se nové řádky vždy vkládají do zdrojové tabulky, než aby byly upraveny.

Poznámka:

I když streamované tabulky ve výchozím nastavení vyžadují zdroje dat, do kterých lze pouze přidávat, můžete toto chování přepsat příznakem skipChangeCommitsv případě, že zdrojem streamování je jiná streamovaná tabulka, která vyžaduje aktualizace nebo odstranění.

Běžný model streamování zahrnuje ingestování zdrojových dat k vytvoření počátečních datových sad v kanálu. Tyto počáteční datové sady se běžně označují jako bronzové tabulky a často provádějí jednoduché transformace.

Naproti tomu konečné tabulky v datovém toku, běžně označované jako zlaté tabulky, často vyžadují složité agregace nebo čtení z cílových bodů operace APPLY CHANGES INTO. Vzhledem k tomu, že tyto operace vytvářejí aktualizace místo připojování, nejsou podporovány jako vstupy pro streamovací tabulky. Tyto transformace jsou vhodnější pro materializovaná zobrazení.

Kombinováním streamovaných tabulek a materializovaných zobrazení do jednoho kanálu můžete kanál zjednodušit, vyhnout se nákladnému opakovanému příjmu dat nebo opětovnému zpracování nezpracovaných dat a mít plný výkon SQL k výpočtu složitých agregací přes efektivně zakódovanou a filtrovanou datovou sadu. Následující příklad ukazuje tento typ smíšeného zpracování:

Poznámka:

Tyto příklady používají k načtení souborů z cloudového úložiště automatický zavaděč. Pokud chcete načíst soubory s funkcí Auto Loader v kanálu s povoleným katalogem Unity, musíte použít externí umístění. Další informace o používání katalogu Unity s dynamickými tabulkami Delta najdete v tématu Použití katalogu Unity s kanály Delta Live Tables.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.readStream.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

Přečtěte si další informace o použití automatického zavaděče k přírůstkové ingestování souborů JSON z úložiště Azure.

Statické spojení streamu

Spojení streamu s statickou dimenzí jsou vhodnou volbou pro denormalizaci souvislého streamu dat pomocí primárně statické tabulky dimenzí.

Při každé aktualizaci kanálu se nové záznamy ze streamu připojí k nejaktuálnějšímu snímku statické tabulky. Pokud jsou záznamy přidány nebo aktualizovány ve statické tabulce po zpracování odpovídajících dat z tabulky streamování, výsledné záznamy se nepřepočítávají, pokud se neprovede úplná aktualizace.

V kanálech nakonfigurovaných pro aktivované spuštění vrátí statická tabulka výsledky po spuštění aktualizace. V potrubích nakonfigurovaných pro průběžné spouštění je při každém zpracování aktualizace dotazována nejnovější verze statické tabulky.

Následuje příklad spojení mezi proudem a statickou množinou dat:

Python

@dlt.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

Efektivní výpočet agregací

Streamované tabulky můžete použít k přírůstkové výpočtu jednoduchých distribuačních agregací, jako je počet, minimum, maximum nebo součet a algebraické agregace, jako jsou průměr nebo směrodatná odchylka. Databricks doporučuje přírůstkovou agregaci pro dotazy s omezeným počtem skupin, jako je dotaz s klauzulí GROUP BY country . Při každé aktualizaci se čtou jenom nová vstupní data.

Další informace o psaní dotazů Delta Live Tables, které provádějí přírůstkové agregace, najdete v tématu Provádění agregací s okny s vodoznaky.

Použití modelů MLflow v pipeline Delta Live Tables

Poznámka:

Pokud chcete používat modely MLflow v kanálu s podporou katalogu Unity, musí být váš kanál nakonfigurovaný tak, aby používal kanál preview. Pokud chcete kanál použít current , musíte nakonfigurovat kanál tak, aby se publikoval do metastoru Hive.

V kanálech Delta Live Tables můžete používat modely natrénované pomocí MLflow. Modely MLflow se v Azure Databricks považují za transformace, což znamená, že pracují se vstupem datového rámce Sparku a vrací výsledky jako datový rámec Sparku. Vzhledem k tomu, že Delta Live Tables vytváří datové sady z datových rámců, můžete převést úlohy Apache Sparku, které používají MLflow, na Delta Live Tables s pouhými několika řádky kódu. Další informace o MLflow najdete v sekci MLflow pro agenty generativní AI a životní cyklus modelu ML.

Pokud už máte poznámkový blok Pythonu, který volá model MLflow, můžete tento kód přizpůsobit Delta Live Tables pomocí @dlt.table dekorátoru a zajistit, aby definice funkcí vracely výsledky transformace. Delta Live Tables ve výchozím nastavení neinstaluje MLflow, proto ověřte, že jste nainstalovali knihovny MLflow s %pip install mlflow a importovali mlflow a dlt na začátku svého poznámkového bloku. Úvod do syntaxe Delta Live Tables viz téma Vývoj kódu kanálu pomocí Pythonu.

Pokud chcete používat modely MLflow v Delta Live Tables, proveďte následující kroky:

  1. Získejte ID spuštění a název modelu MLflow. ID spuštění a název modelu se používají k vytvoření identifikátoru URI modelu MLflow.
  2. Pomocí identifikátoru URI definujte UDF Sparku pro načtení modelu MLflow.
  3. Zavolejte UDF v definicích tabulky, aby se použil model MLflow.

Následující příklad ukazuje základní syntaxi pro tento vzor:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Jako úplný příklad následující kód definuje UDF Sparku s názvem loaded_model_udf , který načte model MLflow natrénovaný na data úvěrového rizika. Datové sloupce použité k předpovědi se předávají jako argument do uživatelsky definované funkce (UDF). Tabulka loan_risk_predictions vypočítá předpovědi pro každý řádek v loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Zachování ručních odstranění nebo aktualizací

Delta Live Tables umožňuje ručně odstranit nebo aktualizovat záznamy z tabulky a provést operaci aktualizace pro přepočítání podřízených tabulek.

Delta Live Tables ve výchozím nastavení přepočítá výsledky tabulky na základě vstupních dat při každé aktualizaci kanálu, takže je nutné zajistit, aby odstraněný záznam nebyl znovu načten ze zdrojových dat. Nastavením vlastnosti tabulky pipelines.reset.allowed na false zabráníte aktualizacím tabulky, ale nezabráníte přírůstkovým zápisům do tabulek nebo novým datům v toku do tabulky.

Následující diagram znázorňuje příklad pomocí dvou streamovaných tabulek:

  • raw_user_table ingestuje nezpracovaná uživatelská data ze zdroje.
  • bmi_table přírůstkově vypočítá skóre BMI pomocí váhy a výšky z raw_user_table.

Chcete ručně odstranit nebo aktualizovat záznamy uživatelů z raw_user_table a znovu zkompilovat bmi_table.

Zachování datového diagramu

Následující kód ukazuje nastavení vlastnosti tabulky pipelines.reset.allowed na false, aby byla deaktivována úplná aktualizace pro raw_user_table, takže zamýšlené změny jsou uchovávány v průběhu času, ale podřízené tabulky se znovu přepočítávají při spuštění aktualizace kanálu:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);