Transformace dat pomocí pipelinek
Tento článek popisuje, jak můžete pomocí DLT deklarovat transformace datových sad a určit způsob zpracování záznamů pomocí logiky dotazu. Obsahuje také příklady běžných transformačních vzorů pro vytváření kanálů DLT.
Datovou sadu můžete definovat pro jakýkoli dotaz, který vrací datový rámec. Jako transformace v kanálu DLT můžete použít integrované operace Apache Sparku, funkce definované uživatelem, vlastní logiku a modely MLflow. Po ingestování dat do kanálu DLT můžete definovat nové datové sady pro upstreamové zdroje a vytvářet nové streamované tabulky, materializovaná zobrazení a zobrazení.
Informace o efektivním provádění stavových zpracování pomocí DLT najdete v tématu Optimalizace stavových zpracování v DLT pomocí vodoznaků.
Kdy použít zobrazení, materializovaná zobrazení a streamované tabulky
Při implementaci dotazů v rámci datového toku vyberte nejlepší typ datové sady, aby byla zajištěna jejich efektivita a udržovatelnost.
Zvažte použití zobrazení k provedení následujících kroků:
- Rozdělte velký nebo složitý dotaz, který chcete použít k jednodušší správě dotazů.
- Ověřte průběžné výsledky pomocí očekávání.
- Snižte náklady na úložiště a výpočetní prostředky pro výsledky, které nemusíte uchovávat. Vzhledem k tomu, že tabulky jsou materializované, vyžadují další výpočetní prostředky a prostředky úložiště.
Zvažte použití materializovaného zobrazení v následujících případech:
- Tabulku spotřebovávají více podřízených dotazů. Vzhledem k tomu, že zobrazení se počítají na vyžádání, je zobrazení znovu vypočteno při každém dotazování zobrazení.
- Tabulku spotřebovávají jiné kanály, úlohy nebo dotazy. Vzhledem k tomu, že zobrazení nejsou materializovaná, můžete je použít pouze ve stejném potrubí.
- Chcete zobrazit výsledky dotazu během vývoje. Vzhledem k tomu, že tabulky jsou materializované a dají se zobrazit a dotazovat mimo kanál, může použití tabulek během vývoje pomoct ověřit správnost výpočtů. Po ověření převeďte dotazy, které nevyžadují materializaci do zobrazení.
Zvažte použití streamované tabulky v následujících případech:
- Dotaz je definován proti zdroji dat, který se nepřetržitě nebo přírůstkově zvětšuje.
- Výsledky dotazu by se měly vypočítat přírůstkově.
- Kanál potřebuje vysokou propustnost a nízkou latenci.
Poznámka
Streamované tabulky jsou vždy definovány vzhledem ke zdrojům streamování. K instalaci aktualizací z informačních kanálů CDC můžete také použít zdroje streamování s APPLY CHANGES INTO
. Viz rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocíDLT .
Vyloučení tabulek z cílového schématu
Pokud je nutné vypočítat zprostředkující tabulky, které nejsou určené pro externí spotřebu, můžete zabránit jejich publikování do schématu pomocí klíčového slova TEMPORARY
. Dočasné tabulky stále ukládají a zpracovávají data podle sémantiky DLT, ale neměly by být přístupné mimo aktuální kanál. Dočasná tabulka zůstane po celou dobu životnosti kanálu, který ji vytvoří. K deklaraci dočasných tabulek použijte následující syntaxi:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dlt.table(
temporary=True)
def temp_table():
return ("...")
Kombinování streamovaných tabulek a materializovaných zobrazení v jednom kanálu
Streamované tabulky dědí záruky zpracování strukturovaného streamování Apache Sparku a jsou nakonfigurované tak, aby zpracovávaly dotazy ze zdrojů dat jen pro připojení, kde se nové řádky vždy vkládají do zdrojové tabulky, a ne upravovat.
Poznámka
I když streamované tabulky ve výchozím nastavení vyžadují zdroje dat pouze přírůstkově, pokud je zdrojem streamování jiná streamovaná tabulka, která vyžaduje aktualizace nebo odstranění, můžete toto chování přepsat příznakem skipChangeCommits.
Běžný model streamování zahrnuje zpracování zdrojových dat k vytvoření počátečních datových sad v rámci zpracování. Tyto počáteční datové sady se běžně označují jako bronzové tabulky a často provádějí jednoduché transformace.
Naproti tomu konečné tabulky v datovém kanálu, běžně označované jako zlaté tabulky, často vyžadují složité agregace nebo čtení z cílů APPLY CHANGES INTO
operace. Vzhledem k tomu, že tyto operace inherentně vytvářejí aktualizace místo přidávání, nejsou podporovány jako vstupy do streamovaných tabulek. Tyto transformace jsou vhodnější pro materializovaná zobrazení.
Kombinováním streamovaných tabulek a materializovaných zobrazení do jednoho kanálu můžete kanál zjednodušit, vyhnout se nákladnému opakovanému příjmu dat nebo opětovnému zpracování nezpracovaných dat a mít plný výkon SQL k výpočtu složitých agregací přes efektivně zakódovanou a filtrovanou datovou sadu. Následující příklad ukazuje tento typ smíšeného zpracování:
Poznámka
Tyto příklady používají Auto Loader k načtení souborů z cloudového úložiště. Pokud chcete načíst soubory automatickým zavaděčem v kanálu s povoleným katalogem Unity, musíte použít externí umístění. Další informace o používání katalogu Unity s DLT najdete v tématu Použití katalogu Unity s kanály DLT.
Python
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
"abfss://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
Přečtěte si další informace o používání Auto Loaderu k přírůstkovému načítání souborů JSON z úložiště Azure.
statické spojení streamu
Spojení typu proud-statická tabulka jsou dobrou volbou při denormalizaci souvislého proudu pouze přidávaných dat s především statickou tabulkou dimenzí.
Při každé aktualizaci kanálu se nové záznamy ze streamu připojí k nejaktuálnějšímu snímku statické tabulky. Pokud jsou záznamy přidány nebo aktualizovány ve statické tabulce po zpracování odpovídajících dat z tabulky streamování, výsledné záznamy se nepřepočítávají, pokud se neprovede úplná aktualizace.
V kanálech nakonfigurovaných pro aktivované spuštění vrátí statická tabulka výsledky po spuštění aktualizace. V pipelinech nakonfigurovaných pro průběžné spouštění se při každém zpracování aktualizace dotazuje nejnovější verze statické tabulky.
Následuje příklad stream-statického spojení:
Python
@dlt.table
def customer_sales():
return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
Efektivní výpočet agregací
Streamované tabulky můžete použít k přírůstkové výpočtu jednoduchých distribuačních agregací, jako je počet, minimum, maximum nebo součet a algebraické agregace, jako jsou průměr nebo směrodatná odchylka. Databricks doporučuje přírůstkovou agregaci pro dotazy s omezeným počtem skupin, jako je například dotaz s klauzulí GROUP BY country
. Při každé aktualizaci se čtou jenom nová vstupní data.
Další informace o psaní dotazů DLT, které provádějí přírůstkové agregace, najdete v tématu Provádění agregací s okny s vodoznaky.
Použití modelů MLflow v kanálu DLT
Poznámka
Pokud chcete používat modely MLflow v kanálu s podporou katalogu Unity, musí být váš kanál nakonfigurovaný tak, aby používal kanál preview
. Pokud chcete použít kanál current
, musíte nakonfigurovat kanál tak, aby se publikoval do metastoru Hive.
V DLT pipelinech můžete použít modely natrénované MLflow. Modely MLflow se v Azure Databricks považují za transformace, což znamená, že pracují se vstupem datového rámce Sparku a vrací výsledky jako datový rámec Sparku. Vzhledem k tomu, že DLT definuje datové sady proti datovým rámcům, můžete převést úlohy Apache Sparku, které používají MLflow na DLT, jen s několika řádky kódu. Další informace o MLflow najdete v tématu MLflow pro generativního AI agenta a životní cyklus modelu ML.
Pokud už máte poznámkový blok Pythonu, který volá model MLflow, můžete tento kód přizpůsobit dlT pomocí dekorátoru @dlt.table
a zajistit, aby funkce byly definovány tak, aby vracely výsledky transformace. DLT ve výchozím nastavení neinstaluje MLflow, proto ověřte, že jste nainstalovali knihovny MLflow s %pip install mlflow
a importovali mlflow
a dlt
v horní části vašeho notebooku. Úvod do syntaxe DLT najdete v tématu Vývoj kódu kanálu pomocíPythonu.
Pokud chcete používat modely MLflow v DLT, proveďte následující kroky:
- Získejte ID spuštění a název modelu MLflow. ID spuštění a název modelu se používají k vytvoření URI modelu MLflow.
- Pomocí identifikátoru URI definujte UDF Sparku pro načtení modelu MLflow.
- Zavolejte UDF v definicích tabulky, aby se použil model MLflow.
Následující příklad ukazuje základní syntaxi pro tento vzor:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
Jako úplný příklad definuje následující kód UDF Sparku pojmenovaný loaded_model_udf
, který načte model MLflow natrénovaný na data úvěrového rizika. Datové sloupce použité k předpovědi jsou předávány jako argument do uživatelsky definované funkce (UDF). Tabulka loan_risk_predictions
vypočítá předpovědi pro každý řádek v loan_risk_input_data
.
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Zachovat ruční smazání nebo aktualizace
DLT umožňuje ručně odstranit nebo aktualizovat záznamy z tabulky a provést operaci aktualizace pro opětovné dokončování podřízených tabulek.
Ve výchozím nastavení DLT přepočítává výsledky tabulky na základě vstupních dat při každé aktualizaci datového kanálu, takže musíte zajistit, aby odstraněný záznam nebyl znovu načten ze zdrojových dat. Nastavením vlastnosti tabulky pipelines.reset.allowed
na false
zabráníte aktualizacím tabulky, ale nezabráníte přírůstkovým zápisům do tabulek nebo novým datům v toku do tabulky.
Následující diagram znázorňuje příklad pomocí dvou streamovaných tabulek:
-
raw_user_table
ingestuje nezpracovaná uživatelská data ze zdroje. -
bmi_table
přírůstkově vypočítá skóre BMI pomocí váhy a výšky zraw_user_table
.
Chcete ručně odstranit nebo aktualizovat záznamy uživatelů z raw_user_table
a znovu zkompilovat bmi_table
.
Následující kód ukazuje nastavení vlastnosti tabulky pipelines.reset.allowed
na false
tak, aby zakázal úplnou aktualizaci pro raw_user_table
, aby zamýšlené úpravy byly zachovány v čase, ale podřízené tabulky se přepočítají, když je spuštěna aktualizace datového toku.
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);