Przesyłanie strumieniowe rekordów do usług zewnętrznych za pomocą ujściów tabel delta Live Tables
Ważny
Interfejs API usługi Delta Live Tables sink
znajduje się w publicznej wersji zapoznawczej.
W tym artykule opisano interfejs API Delta Live Tables sink
oraz sposób, w jaki można go używać z przepływami DLT do zapisywania rekordów przekształconych przez potok do zewnętrznego ujścia danych, takiego jak tabele zarządzane przez Unity Catalog i tabele zewnętrzne, tabele metastore Hive oraz usługi przesyłania strumieniowego zdarzeń, takie jak Apache Kafka lub Azure Event Hubs.
Co to są ujścia tabel na żywo usługi Delta?
Ujścia dla Tabel Delta Live umożliwiają zapisywanie przekształconych danych w celach, takich jak usługi strumieniujące zdarzenia, jak Apache Kafka lub Azure Event Hubs, oraz tabele zewnętrzne zarządzane przez Unity Catalog lub metastore Hive. Wcześniej tabele przesyłania strumieniowego i zmaterializowane widoki utworzone w potoku Delta Live Tables mogły być utrwalane tylko w tabelach Delta zarządzanych przez usługę Azure Databricks. Dzięki wykorzystaniu narzędzi wyjściowych masz teraz więcej opcji utrwalania danych wyjściowych potoków Delta Live Tables.
Kiedy należy używać odbiorców Delta Live Tables?
Databricks zaleca korzystanie z źródeł danych Delta Live Tables, jeśli musisz:
- Utwórz przypadek użycia operacyjnego, taki jak wykrywanie oszustw, analiza w czasie rzeczywistym i rekomendacje klientów. Przypadki użycia operacyjnego zwykle odczytują dane z magistrali komunikatów, takich jak temat Apache Kafka, a następnie przetwarzają dane przy niskim opóźnieniu, zapisując przetworzone rekordy z powrotem do magistrali komunikatów. Takie podejście umożliwia osiągnięcie mniejszego opóźnienia, ponieważ nie zapisujesz ani nie odczytujesz danych z magazynu w chmurze.
- Zapisywanie przekształconych danych z przepływów Delta Live Tables do tabel zarządzanych przez zewnętrzny system Delta, w tym tabel zarządzanych przez Unity Catalog, tabel zewnętrznych i tabel magazynu metadanych Hive.
- Wykonaj odwrotne wyodrębnianie-przekształcanie-ładowanie (ETL) w ujściach zewnętrznych dla usługi Databricks, takich jak tematy platformy Apache Kafka. Takie podejście umożliwia efektywną obsługę przypadków użycia, w których dane muszą być odczytywane lub używane poza tabelami katalogu Unity lub innymi magazynami zarządzanymi przez usługę Databricks.
Jak korzystać z układów wyjściowych Delta Live Tables?
Notatka
- Obsługiwane są tylko zapytania przesyłane strumieniowo przy użyciu
spark.readStream
idlt.read_stream
. Zapytania wsadowe nie są obsługiwane. - Tylko
append_flow
jest używany do zapisywania w odbiornikach. Inne przepływy, takie jakapply_changes
, nie są obsługiwane. - Uruchomienie aktualizacji pełnego odświeżania nie powoduje wyczyszczenia wcześniej obliczonych danych wyników w ujściach. Oznacza to, że wszystkie ponownie przetworzone dane zostaną dołączone do ujścia, a istniejące dane nie zostaną zmienione.
Ponieważ dane zdarzeń są pobierane ze źródła strumieniowania do potoku Delta Live Tables, przetwarzasz i udoskonalasz te dane, korzystając z funkcji Delta Live Tables, a następnie używasz przetwarzania przepływu dołączania, aby przesyłać strumieniowo przekształcone rekordy danych do ujścia Delta Live Tables. Tworzysz ten zlew przy użyciu funkcji create_sink()
. Aby uzyskać więcej informacji na temat korzystania z funkcji create_sink
, zapoznaj się z dokumentacją interfejsu API zlewu .
Aby zaimplementować docel Delta Live Tables, wykonaj następujące kroki:
- Skonfiguruj potok Delta Live Tables, aby przetwarzać dane zdarzeń przesyłanych strumieniowo i przygotować rekordy danych do zapisania w ujściu Delta Live Tables.
- Skonfiguruj i utwórz odbiornik dla tabel Delta Live Tables, aby użyć preferowanego formatu odbiornika docelowego.
- Użyj przepływu dołączania, aby zapisać przygotowane rekordy do odbiornika.
Te kroki zostały omówione w pozostałej części tematu.
Skonfiguruj potok Delta Live Tables w celu przygotowania rekordów do zapisu w miejscu docelowym
Pierwszym krokiem jest skonfigurowanie potoku Delta Live Tables w celu przekształcenia nieprzetworzonych danych strumienia zdarzeń na przygotowane dane, które zostaną zapisane w miejscu docelowym.
Aby lepiej zrozumieć ten proces, możesz skorzystać z tego przykładu potoku Delta Live Tables, który przetwarza dane kliknięć z przykładowych danych wikipedia-datasets
w usłudze Databricks. Ten potok analizuje surowe dane w celu zidentyfikowania stron Wikipedii, które łączą się ze stroną dokumentacji Apache Spark, i stopniowo ogranicza te dane do wierszy tabeli, w których link odwołujący się zawiera Apache_Spark.
W tym przykładzie potok Delta Live Tables jest zorganizowany w oparciu o architekturę medalionu , która strukturyzuje dane w różnych warstwach, aby zwiększyć jakość i efektywność przetwarzania.
Aby rozpocząć, załaduj nieprzetworzone rekordy JSON z zestawu danych do warstwy z brązu przy użyciu automatycznego modułu ładującego. Ten kod w języku Python pokazuje, jak utworzyć tabelę strumieniową o nazwie clickstream_raw
, która zawiera nieprzetworzone dane ze źródła:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
Po uruchomieniu tego kodu dane są teraz na poziomie "brązu" (lub "surowych danych") architektury Medallion i trzeba je oczyścić. Kolejny krok to dopracowanie danych do poziomu "srebrnego", co obejmuje czyszczenie typów danych i nazw kolumn oraz korzystanie z oczekiwań funkcji Delta Live Tables, aby zapewnić integralność danych.
Poniższy kod pokazuje, jak to zrobić poprzez czyszczenie i sprawdzanie danych warstwy z brązu w tabeli srebrnej clickstream_clean
.
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
Aby opracować "złotą" warstwę struktury potoku, należy przefiltrować oczyszczone dane strumienia kliknięć, aby odizolować wpisy, na których strona odwołująca się jest Apache_Spark
. W tym ostatnim przykładzie kodu wybierasz tylko kolumny niezbędne do zapisania w docelowej tabeli danych.
Poniższy kod ilustruje sposób tworzenia tabeli o nazwie spark_referrers
reprezentującej warstwę złota:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
Po zakończeniu tego procesu przygotowywania danych należy skonfigurować ujścia docelowe, w których zostaną zapisane oczyszczone rekordy.
Konfigurowanie wyjścia Delta Live Tables
Usługa Databricks obsługuje trzy typy ujściów docelowych, w których zapisujesz rekordy przetwarzane z danych strumienia:
- Ujścia tabeli delty
- Ujścia platformy Apache Kafka
- Ujścia usługi Azure Event Hubs
Poniżej przedstawiono przykłady konfiguracji dla ujściów usługi Delta, Kafka i Azure Event Hubs:
Deltowe zagłębienia
Aby utworzyć ujście Delta według ścieżki pliku:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Aby utworzyć ujście delty według nazwy tabeli przy użyciu w pełni kwalifikowanej ścieżki wykazu i schematu:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Ujścia platformy Kafka i usługi Azure Event Hubs
Ten kod działa zarówno z kanałami wyjściowymi Apache Kafka, jak i Azure Event Hubs.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Po skonfigurowaniu ujścia i przygotowaniu potoku delta Live Tables można rozpocząć przesyłanie strumieniowe przetworzonych rekordów do ujścia.
zapis do Delta Live Tables z przepływem z dołączaniem
Po skonfigurowaniu ujścia następnym krokiem jest zapisanie przetworzonych rekordów, określając je jako element docelowy dla rekordów wyjściowych przez przepływ dołączania. W tym celu należy określić zlew jako wartość target
w dekoratorze append_flow
.
- W przypadku zarządzanych i zewnętrznych tabel Unity Catalog użyj formatu
delta
i określ ścieżkę lub nazwę tabeli w opcjach. Pipelines Delta Live Tables muszą być skonfigurowane do korzystania z Unity Catalog. - W przypadku tematów platformy Apache Kafka użyj formatu
kafka
i określ nazwę tematu, informacje o połączeniu i informacje dotyczące uwierzytelniania w opcjach konfiguracyjnych. Te same opcje są obsługiwane przez wyjście Kafka dla strumieniowego przetwarzania danych w strukturze Spark. Zobacz Konfiguracja zapisywarki strumieniowej Kafka. - W przypadku usługi Azure Event Hubs użyj formatu
kafka
i określ nazwę usługi Event Hubs, informacje o połączeniu i informacje dotyczące uwierzytelniania w opcjach. To te same opcje, które są obsługiwane w miejscu docelowym Event Hubs w Spark Structured Streaming, które korzysta z interfejsu Kafka. Zobacz Uwierzytelnianie jednostki usługi przy użyciu identyfikatora Entra firmy Microsoft i usługi Azure Event Hubs. - W przypadku tabel magazynu metadanych Hive użyj formatu
delta
i określ ścieżkę lub nazwę tabeli w opcjach. Potoki Delta Live Tables muszą być skonfigurowane do korzystania z magazynu metadanych Hive.
Poniżej przedstawiono przykłady konfigurowania przepływów do zapisywania do miejsc docelowych Delta, Kafka i Azure Event Hubs z rekordami przetwarzanymi przez pipeline Delta Live Tables.
Zlew delta
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
"Synki platformy Kafka i usługi Azure Event Hubs"
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
Parametr value
jest obowiązkowy dla ujścia usługi Azure Event Hubs. Dodatkowe parametry, takie jak key
, partition
, headers
i topic
, są opcjonalne.
Aby uzyskać więcej informacji na temat dekoratora append_flow
, zobacz "Użyj przepływu dodawania, aby zapisywać do tabeli strumieniowej z wielu źródeł".
Ograniczenia
Obsługiwany jest tylko interfejs API języka Python. Język SQL nie jest obsługiwany.
Obsługiwane są tylko zapytania przesyłane strumieniowo przy użyciu
spark.readStream
idlt.read_stream
. Zapytania wsadowe nie są obsługiwane.Tylko
append_flow
można użyć do zapisywania w ujściach. Inne przepływy, takie jakapply_changes
, nie są obsługiwane i nie można użyć ujścia w definicji zestawu danych Delta Live Tables. Na przykład, następujące nie jest obsługiwane:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
W przypadku zlewów Delta nazwa tabeli musi być pełną nazwą kwalifikowaną. W szczególności w przypadku zarządzanych przez Unity Catalog tabel zewnętrznych, nazwa tabeli musi mieć postać
<catalog>.<schema>.<table>
. W przypadku magazynu metadanych Hive musi on znajdować się w postaci<schema>.<table>
.Uruchomienie
FullRefresh
nie spowoduje wyczyszczenia wcześniej obliczonych danych wyników w odbiornikach. Oznacza to, że wszystkie ponownie przetworzone dane zostaną dołączone do ujścia, a istniejące dane nie zostaną zmienione.Oczekiwania dotyczące Delta Live Tables nie są obsługiwane.
Zasoby
- Tworzenie potoków Delta Live Tables
- Ładowanie i przetwarzanie danych przyrostowo przy użyciu przepływów Delta Live Tables
- dokumentacja interfejsu API ujścia języka Python