Przekształcanie danych za pomocą usługi Delta Live Tables
W tym artykule opisano, jak można użyć funkcji Delta Live Tables do deklarowania przekształceń w zestawach danych i określania sposobu przetwarzania rekordów za pomocą logiki zapytań. Zawiera również przykłady typowych wzorców transformacji służących do budowy potoków Delta Live Tables.
Zestaw danych można zdefiniować względem dowolnego zapytania, które zwraca ramkę danych. Możesz użyć wbudowanych operacji platformy Apache Spark, UDF-ów, niestandardowej logiki i modeli MLflow jako przekształceń w potoku usługi Delta Live Tables. Po zasileniu danych do potoku Delta Live Tables można zdefiniować nowe zestawy danych na podstawie źródeł nadrzędnych, aby utworzyć nowe przesyły strumieniowe tables, zmaterializowane dane viewsoraz views.
Aby dowiedzieć się, jak skutecznie wykonywać przetwarzanie stanowe z użyciem Delta Live Tables, zobacz Optimize przetwarzanie stanowe w Delta Live Tables ze znacznikami czasowymi (watermarks).
Kiedy używać zmaterializowanego views, viewsi przesyłania strumieniowego tables
Podczas implementowania zapytań potoku wybierz najlepszy typ zestawu danych, aby upewnić się, że są wydajne i możliwe do utrzymania.
Rozważ użycie widoku, aby wykonać następujące czynności:
- Podziel duże lub złożone zapytanie, które chcesz łatwiej zarządzać zapytaniami.
- Zweryfikuj wyniki pośrednie przy użyciu oczekiwań.
- Zmniejsz koszty magazynowania i zasobów obliczeniowych, aby uzyskać wyniki, których nie trzeba utrwalać. Ponieważ tables są zmaterializowane, wymagają dodatkowych zasobów obliczeniowych i zasobów magazynowania.
Rozważ użycie zmaterializowanego widoku, gdy:
- Wiele zapytań podrzędnych wykorzystuje table. Ponieważ views są obliczane na żądanie, widok jest obliczany ponownie za każdym razem, gdy zostanie zapytany widok.
- Inne potoki, zadania lub zapytania konsumują table. Ponieważ views nie są zmaterializowane, można ich używać tylko w tej samej sekwencji.
- Chcesz wyświetlić wyniki zapytania podczas opracowywania. Ponieważ tables są zmaterializowane i mogą być wyświetlane, a zapytania mogą być wykonywane poza strumieniem danych, użycie tables podczas opracowywania może pomóc w zweryfikowaniu poprawności obliczeń. Po zweryfikowaniu przekonwertuj zapytania, które nie wymagają materializacji na views.
Rozważ użycie przesyłania strumieniowego table, gdy:
- Zapytanie jest definiowane względem źródła danych, które stale lub przyrostowo rośnie.
- Wyniki zapytania powinny być obliczane przyrostowo.
- Potok wymaga wysokiej przepływności i małych opóźnień.
Uwaga
tables przesyłania strumieniowego są zawsze definiowane względem źródeł przesyłania strumieniowego. Możesz również użyć źródeł przesyłania strumieniowego za APPLY CHANGES INTO
pomocą polecenia , aby zastosować aktualizacje z kanałów informacyjnych CDC. Zobacz interfejsy API APPLY CHANGES: upraszczanie przechwytywania danych zmian za pomocą Delta Live Tables.
Wyklucz tables z docelowego schema
Jeśli musisz obliczyć pośrednie tables, które nie są przeznaczone do użytku zewnętrznego, możesz zapobiec ich publikowaniu w schema przy użyciu słowa kluczowego TEMPORARY
. Tymczasowa pamięć tables nadal przechowuje i przetwarza dane zgodnie z semantyką Delta Live Tables, ale nie powinna być dostępna poza bieżącym potokiem. Tymczasowy table trwa przez cały okres istnienia pipeline'u, który go tworzy. Użyj następującej składni, aby zadeklarować tymczasowe tables:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dlt.table(
temporary=True)
def temp_table():
return ("...")
Połącz przesyłanie strumieniowe tables i zmaterializowane views w jednym potoku
Przesyłanie strumieniowe tables dziedziczy gwarancje przetwarzania z Apache Spark Structured Streaming i jest skonfigurowane do przetwarzania zapytań z danych źródłowych tylko w trybie dodawania, where nowe wiersze są zawsze wstawiane do źródła table, a nie modyfikowane.
Uwaga
Mimo że domyślnie przesyłanie strumieniowe tables wymaga źródeł danych tylko do dołączania, gdy źródłem przesyłania strumieniowego jest inne źródło strumieniowe table, które wymaga aktualizacji lub usuwania, można zastąpić to zachowanie, używając flagi skipChangeCommits.
Typowy wzorzec przesyłania strumieniowego obejmuje pozyskiwanie danych źródłowych w celu utworzenia początkowych zestawów danych w potoku. Te początkowe zestawy danych są często nazywane brązowymi tables i często wykonują proste przekształcenia.
Natomiast końcowa tables w potoku, nazywana często złotym tables, wymaga skomplikowanych agregacji lub odczytu z celów operacji APPLY CHANGES INTO
. Ponieważ te operacje z natury tworzą aktualizacje zamiast dołączać, nie są one obsługiwane jako wejście dla streamingu tables. Te przekształcenia są lepiej dostosowane do materializacji views.
Łącząc przesyłanie strumieniowe tables z zmaterializowanymi danymi views w jednym potoku, możesz uprościć potok, uniknąć kosztownego ponownego pozyskiwania lub ponownego przetwarzania danych pierwotnych i wykorzystać pełne możliwości SQL do obliczania złożonych agregacji w wydajnie zakodowanym i przefiltrowanym zestawie danych. W poniższym przykładzie przedstawiono ten typ przetwarzania mieszanego:
Uwaga
W tych przykładach użyto modułu automatycznego ładującego do ładowania plików z magazynu w chmurze. Aby załadować pliki z automatycznym modułem ładującym w potoku z obsługą Catalog aparatu Unity, należy użyć lokalizacji zewnętrznych. Aby dowiedzieć się więcej na temat używania Unity Catalog z Delta Live Tables, zobacz Używanie Unity Catalog z twoimi potokami Delta Live Tables.
Python
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("LIVE.streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("LIVE.streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
"abfss://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
Dowiedz się więcej na temat używania automatycznego modułu ładującego do przyrostowego pozyskiwania plików JSON z usługi Azure Storage.
Sprzężenia statyczne strumienia
Łączenia strumieniowo-statyczne są dobrym wyborem w przypadku denormalizacji ciągłego strumienia danych wyłącznie do dołączania z głównie statycznym wymiarem table.
W przypadku każdego potoku updatenowe rekordy ze strumienia są łączone z bieżącą migawką statycznego table. Jeśli rekordy są dodawane lub aktualizowane w statycznych table po przetworzeniu odpowiednich danych z przesyłania strumieniowego table, wynikowe rekordy nie są ponownie obliczane, chyba że pełna refresh zostanie wykonana.
W liniach przetwarzania skonfigurowanych do uruchamiania na żądanie, statyczne table zwraca wyniki z chwili, gdy rozpoczęto update. W potokach skonfigurowanych do ciągłego wykonywania, najnowsza wersja statycznego table jest odpytywana za każdym razem, gdy table przetwarza update.
Poniżej przedstawiono przykład joinstrumienia statycznego:
Python
@dlt.table
def customer_sales():
return spark.readStream.table("LIVE.sales").join(spark.readStream.table("LIVE.customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
Wydajne obliczanie agregacji
Za pomocą przesyłania strumieniowego tables można obliczać przyrostowo proste agregaty dystrybucyjne, takie jak zliczenie, minimum, maksimum lub suma, oraz agregaty algebraiczne, takie jak średnia lub odchylenie standardowe. Usługa Databricks zaleca agregację przyrostową dla zapytań z ograniczoną liczbą grup, takich jak zapytanie z klauzulą GROUP BY country
. Tylko nowe dane wejściowe są odczytywane z każdym update.
Aby dowiedzieć się więcej na temat pisania zapytań usługi Delta Live Tables wykonujących agregacje przyrostowe, przeczytaj artykuł Wykonywanie agregacji okien przy użyciu znaków wodnych.
Używanie modeli MLflow w potoku Delta Live Tables
Uwaga
Aby używać modeli MLflow w potoku z obsługą Catalogaparatu Unity, potok musi być skonfigurowany do korzystania z kanału preview
. Aby użyć kanału current
, należy skonfigurować potok do publikowania w magazynie metadanych Hive.
Modele trenowane przez platformę MLflow można używać w potokach usługi Delta Live Tables. Modele MLflow są traktowane jako przekształcenia w usłudze Azure Databricks, co oznacza, że działają na danych wejściowych ramki danych platformy Spark i zwracają wyniki jako ramkę danych platformy Spark. Ponieważ usługa Delta Live Tables definiuje zestawy danych względem ramek danych, można przekonwertować obciążenia platformy Apache Spark, które używają biblioteki MLflow do funkcji Delta Live Tables z zaledwie kilkoma wierszami kodu. Aby uzyskać więcej informacji na temat platformy MLflow, zobacz MLflow for gen AI app and model lifecycle.
Jeśli masz już notes języka Python wywołujący model MLflow, możesz dostosować ten kod do Delta Live Tables przy użyciu dekoratora @dlt.table
i upewnienia się, że funkcje są zdefiniowane do zwracania wyników transformacji. Usługa Delta Live Tables domyślnie nie instaluje bibliotek MLflow, dlatego upewnij się, że zainstalowano biblioteki MLFlow z %pip install mlflow
i zaimportowano mlflow
i dlt
w górnej części notesu. Aby zapoznać się z wprowadzeniem do składni usługi Delta Live Tables, zobacz temat Tworzenie kodu potoków przy użyciu języka Python.
Aby użyć modeli MLflow w usłudze Delta Live Tables, wykonaj następujące kroki:
- Uzyskaj identyfikator przebiegu i nazwę modelu MLflow. Identyfikator przebiegu i nazwa modelu są używane do konstruowania identyfikatora URI modelu MLflow.
- Użyj identyfikatora URI, aby zdefiniować funkcję UDF platformy Spark w celu załadowania modelu MLflow.
- Wywołaj funkcję UDF w definicjach table, aby użyć modelu MLflow.
W poniższym przykładzie przedstawiono podstawową składnię dla tego wzorca:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
W kompletnym przykładzie poniższy kod definiuje funkcję UDF platformy Spark o nazwie loaded_model_udf
, która ładuje model MLflow wyszkolony na podstawie danych o ryzyku kredytowym. Dane columns użyte do przewidywania są przekazywane jako argument do funkcji FUO.
table
loan_risk_predictions
oblicza przewidywania dla każdego wiersza w loan_risk_input_data
.
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Zachowywanie ręcznego usuwania lub aktualizacji
Usługa Delta Live Tables umożliwia ręczne usuwanie lub update rekordów z table i wykonywanie operacji refresh w celu ponownego skompilowania podrzędnych tables.
Domyślnie usługa Delta Live Tables ponownie przelicza wyniki table bazując na danych wejściowych za każdym razem, gdy potok danych zostanie zaktualizowany, dlatego powinieneś upewnić się, że usunięty rekord nie zostanie ponownie wczytany z danych źródłowych. Ustawienie właściwości pipelines.reset.allowed
table na wartość false
blokuje odświeżanie table, ale nie blokuje przyrostowych zapisów w tables ani wpływania nowych danych do table.
Na poniższym diagramie przedstawiono przykład użycia dwóch streaming tables.
-
raw_user_table
pozyskiwanie nieprzetworzonych danych użytkownika ze źródła. -
bmi_table
przyrostowo oblicza wyniki BMI przy użyciu wagi i wysokości zraw_user_table
.
Chcesz ręcznie usunąć lub update rekordy użytkowników z raw_user_table
i ponownie skompilować bmi_table
.
Poniższy kod ilustruje ustawienie właściwości pipelines.reset.allowed
table na false
, aby wyłączyć pełną funkcję refresh dla raw_user_table
, dzięki czemu planowane zmiany są zachowane w czasie, ale podrzędne tables są przeliczane po uruchomieniu potoku update.
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);