Przekształcanie danych za pomocą tabel delta live
W tym artykule opisano sposób deklarowania przekształceń w zestawach danych przy użyciu tabel różnicowych na żywo oraz określania sposobu przetwarzania rekordów za pomocą logiki zapytań. Zawiera również przykłady typowych wzorców przekształcania do tworzenia potoków tabel na żywo delty.
Zestaw danych można zdefiniować względem dowolnego zapytania, które zwraca ramkę danych. Możesz użyć wbudowanych operacji platformy Apache Spark, funkcji zdefiniowanych przez użytkownika, niestandardowych modeli logiki i MLflow jako przekształceń w potoku delta Live Tables. Po pozyskiwaniu danych do potoku delta Live Tables można zdefiniować nowe zestawy danych względem źródeł nadrzędnych w celu utworzenia nowych tabel przesyłania strumieniowego, zmaterializowanych widoków i widoków.
Aby dowiedzieć się, jak skutecznie wykonywać przetwarzanie stanowe przy użyciu tabel delta Live Tables, zobacz Optymalizowanie przetwarzania stanowego w tabelach delta live tables przy użyciu znaków wodnych.
Kiedy używać widoków, zmaterializowanych widoków i tabel przesyłania strumieniowego
Podczas implementowania zapytań potoku wybierz najlepszy typ zestawu danych, aby upewnić się, że są wydajne i możliwe do utrzymania.
Rozważ użycie widoku, aby wykonać następujące czynności:
- Podziel duże lub złożone zapytanie, które chcesz łatwiej zarządzać zapytaniami.
- Zweryfikuj wyniki pośrednie przy użyciu oczekiwań.
- Zmniejsz koszty magazynowania i zasobów obliczeniowych, aby uzyskać wyniki, których nie trzeba utrwalać. Ponieważ tabele są zmaterializowane, wymagają dodatkowych zasobów obliczeniowych i magazynu.
Rozważ użycie zmaterializowanego widoku, gdy:
- Wiele zapytań podrzędnych korzysta z tabeli. Ponieważ widoki są obliczane na żądanie, widok jest obliczany ponownie za każdym razem, gdy jest wykonywane zapytanie dotyczące widoku.
- Inne potoki, zadania lub zapytania używają tabeli. Ponieważ widoki nie są zmaterializowane, można ich używać tylko w tym samym potoku.
- Chcesz wyświetlić wyniki zapytania podczas opracowywania. Ponieważ tabele są zmaterializowane i mogą być wyświetlane i odpytywane poza potokiem, użycie tabel podczas programowania może pomóc zweryfikować poprawność obliczeń. Po zweryfikowaniu przekonwertuj zapytania, które nie wymagają materializacji w widoki.
Rozważ użycie tabeli przesyłania strumieniowego, gdy:
- Zapytanie jest definiowane względem źródła danych, które stale lub przyrostowo rośnie.
- Wyniki zapytania powinny być obliczane przyrostowo.
- Potok wymaga wysokiej przepływności i małych opóźnień.
Uwaga
Tabele przesyłania strumieniowego są zawsze definiowane względem źródeł przesyłania strumieniowego. Możesz również użyć źródeł przesyłania strumieniowego za APPLY CHANGES INTO
pomocą polecenia , aby zastosować aktualizacje z kanałów informacyjnych CDC. Zobacz Interfejsy API ZASTOSUJ ZMIANY: upraszczanie przechwytywania danych zmian za pomocą tabel różnicowych na żywo.
Wykluczanie tabel ze schematu docelowego
Jeśli musisz obliczyć tabele pośrednie, które nie są przeznaczone do użycia zewnętrznego, możesz uniemożliwić ich publikowanie w schemacie przy użyciu słowa kluczowego TEMPORARY
. Tabele tymczasowe nadal przechowują i przetwarzają dane zgodnie z semantykami tabel delta Live Tables, ale nie powinny być dostępne poza bieżącym potokiem. Tabela tymczasowa jest utrwalana przez okres istnienia potoku, który go tworzy. Użyj następującej składni, aby zadeklarować tabele tymczasowe:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dlt.table(
temporary=True)
def temp_table():
return ("...")
Łączenie tabel przesyłania strumieniowego i zmaterializowanych widoków w jednym potoku
Tabele przesyłania strumieniowego dziedziczą gwarancje przetwarzania przesyłania strumieniowego ze strukturą platformy Apache Spark i są skonfigurowane do przetwarzania zapytań ze źródeł danych tylko do dołączania, gdzie nowe wiersze są zawsze wstawiane do tabeli źródłowej, a nie modyfikowane.
Uwaga
Mimo że domyślnie tabele przesyłania strumieniowego wymagają źródeł danych tylko do dołączania, gdy źródło przesyłania strumieniowego jest inną tabelą przesyłania strumieniowego, która wymaga aktualizacji lub usunięcia, można zastąpić to zachowanie flagą skipChangeCommits.
Typowy wzorzec przesyłania strumieniowego obejmuje pozyskiwanie danych źródłowych w celu utworzenia początkowych zestawów danych w potoku. Te początkowe zestawy danych są często nazywane tabelami z brązu i często wykonują proste przekształcenia.
Natomiast ostateczne tabele w potoku, nazywane często złotymi tabelami, często wymagają skomplikowanych agregacji lub odczytu z celów APPLY CHANGES INTO
operacji. Ponieważ te operacje z natury tworzą aktualizacje, a nie dołączają, nie są one obsługiwane jako dane wejściowe do tabel przesyłania strumieniowego. Te przekształcenia są lepiej dostosowane do zmaterializowanych widoków.
Łącząc tabele przesyłania strumieniowego i zmaterializowane widoki w jeden potok, można uprościć potok, uniknąć kosztownego ponownego pozyskiwania lub ponownego przetwarzania danych pierwotnych oraz mieć pełną moc języka SQL do obliczania złożonych agregacji w wydajnym zakodowanym i filtrowanym zestawie danych. W poniższym przykładzie przedstawiono ten typ przetwarzania mieszanego:
Uwaga
W tych przykładach użyto modułu automatycznego ładującego do ładowania plików z magazynu w chmurze. Aby załadować pliki z automatycznym modułem ładującym w potoku z obsługą wykazu aparatu Unity, należy użyć lokalizacji zewnętrznych. Aby dowiedzieć się więcej o korzystaniu z rozwiązania Unity Catalog z tabelami Delta Live Tables, zobacz Używanie ozwiązania Unity Catalog z potokami platformy Delta Live Tables.
Python
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("LIVE.streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("LIVE.streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
"abfss://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
Dowiedz się więcej na temat używania automatycznego modułu ładującego do przyrostowego pozyskiwania plików JSON z usługi Azure Storage.
Sprzężenia statyczne strumienia
Sprzężenia statyczne strumienia są dobrym wyborem w przypadku denormalizacji ciągłego strumienia danych tylko do dołączania przy użyciu głównie statycznej tabeli wymiarów.
Po każdej aktualizacji potoku nowe rekordy ze strumienia są łączone z najbardziej bieżącą migawką tabeli statycznej. Jeśli rekordy są dodawane lub aktualizowane w tabeli statycznej po przetworzeniu odpowiednich danych z tabeli przesyłania strumieniowego, wynikowe rekordy nie zostaną ponownie obliczone, chyba że zostanie wykonane pełne odświeżenie.
W potokach skonfigurowanych do wyzwalanego wykonywania tabela statyczna zwraca wyniki od momentu rozpoczęcia aktualizacji. W potokach skonfigurowanych do ciągłego wykonywania najnowsza wersja tabeli statycznej jest odpytywane za każdym razem, gdy tabela przetwarza aktualizację.
Poniżej przedstawiono przykład sprzężenia statycznego strumienia:
Python
@dlt.table
def customer_sales():
return spark.readStream.table("LIVE.sales").join(spark.readStream.table("LIVE.customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
Wydajne obliczanie agregacji
Tabele przesyłania strumieniowego umożliwiają przyrostowe obliczanie prostych agregacji dystrybucyjnych, takich jak liczba, minimalna, maksymalna lub suma oraz agregacje algebraiczne, takie jak średnia lub odchylenie standardowe. Usługa Databricks zaleca agregację przyrostową dla zapytań z ograniczoną liczbą grup, takich jak zapytanie z klauzulą GROUP BY country
. Tylko nowe dane wejściowe są odczytywane przy każdej aktualizacji.
Aby dowiedzieć się więcej na temat pisania zapytań funkcji Delta Live Tables, które wykonują agregacje przyrostowe, zobacz Wykonywanie agregacji okiennych przy użyciu znaków wodnych.
Używanie modeli MLflow w potoku delta live tables
Uwaga
Aby używać modeli MLflow w potoku obsługującym wykaz aparatu Unity, potok musi być skonfigurowany do korzystania z kanału preview
. Aby użyć kanału current
, należy skonfigurować potok do publikowania w magazynie metadanych Hive.
Modele trenowane przez platformę MLflow można używać w potokach tabel delta live tables. Modele MLflow są traktowane jako przekształcenia w usłudze Azure Databricks, co oznacza, że działają na danych wejściowych ramki danych platformy Spark i zwracają wyniki jako ramkę danych platformy Spark. Ponieważ tabele delta Live Tables definiują zestawy danych względem ramek danych, można konwertować obciążenia platformy Apache Spark, które używają biblioteki MLflow do tabel na żywo funkcji Delta, używając tylko kilku wierszy kodu. Aby uzyskać więcej informacji na temat platformy MLflow, zobacz Zarządzanie cyklem życia uczenia maszynowego przy użyciu biblioteki MLflow.
Jeśli masz już notes języka Python wywołujący model MLflow, możesz dostosować ten kod do funkcji Delta Live Tables przy użyciu dekoratora @dlt.table
i zapewnienia, że funkcje są zdefiniowane w celu zwrócenia wyników transformacji. Usługi Delta Live Tables nie instalują domyślnie bibliotekI MLflow, dlatego upewnij się, że biblioteki %pip install mlflow
MLFlow zostały zainstalowane i zaimportowane mlflow
w dlt
górnej części notesu. Aby zapoznać się z wprowadzeniem do składni delta live tables, zobacz Develop pipeline code with Python (Opracowywanie kodu potoku przy użyciu języka Python).
Aby użyć modeli MLflow w tabelach delta live, wykonaj następujące kroki:
- Uzyskaj identyfikator przebiegu i nazwę modelu MLflow. Identyfikator przebiegu i nazwa modelu są używane do konstruowania identyfikatora URI modelu MLflow.
- Użyj identyfikatora URI, aby zdefiniować funkcję UDF platformy Spark w celu załadowania modelu MLflow.
- Wywołaj funkcję UDF w definicjach tabeli, aby użyć modelu MLflow.
W poniższym przykładzie przedstawiono podstawową składnię dla tego wzorca:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
W kompletnym przykładzie poniższy kod definiuje funkcję UDF platformy Spark o nazwie loaded_model_udf
, która ładuje model MLflow wyszkolony na podstawie danych o ryzyku kredytowym. Kolumny danych używane do przewidywania są przekazywane jako argument do funkcji zdefiniowanej przez użytkownika. loan_risk_predictions
Tabela oblicza przewidywania dla każdego wiersza w pliku loan_risk_input_data
.
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Zachowywanie ręcznego usuwania lub aktualizacji
Funkcja Delta Live Tables umożliwia ręczne usuwanie lub aktualizowanie rekordów z tabeli i wykonywanie operacji odświeżania w celu ponownego skompilowania tabel podrzędnych.
Domyślnie tabele delta Live Tabele ponownie skompiluje wyniki tabeli na podstawie danych wejściowych za każdym razem, gdy potok zostanie zaktualizowany, dlatego należy się upewnić, że usunięty rekord nie zostanie ponownie załadowany z danych źródłowych. pipelines.reset.allowed
Ustawienie właściwości tabeli w taki sposób, aby false
uniemożliwia odświeżanie tabeli, ale nie uniemożliwia przyrostowych zapisów w tabelach lub nowych danych przepływanych do tabeli.
Na poniższym diagramie przedstawiono przykład użycia dwóch tabel przesyłania strumieniowego:
raw_user_table
pozyskiwanie nieprzetworzonych danych użytkownika ze źródła.bmi_table
przyrostowo oblicza wyniki BMI przy użyciu wagi i wysokości zraw_user_table
.
Chcesz ręcznie usunąć lub zaktualizować rekordy użytkowników z obiektu raw_user_table
i ponownie skompilować bmi_table
element .
Poniższy kod demonstruje ustawienie pipelines.reset.allowed
właściwości tabeli w celu false
wyłączenia pełnego odświeżania w raw_user_table
celu zachowania zamierzonych zmian w czasie, ale tabele podrzędne są ponownie obliczane po uruchomieniu aktualizacji potoku:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);