Streamování záznamů do externích služeb pomocí DLT uzlů
Důležitý
Rozhraní DLT sink
API je ve verzi Public Preview.
Tento článek popisuje rozhraní API pro sink
DLT a jeho použití s toky DLT k zápisu záznamů transformovaných kanálem do externí jímky dat, jako jsou spravované katalogy Unity a externí tabulky, tabulky metastoru Hive a služby streamování událostí, jako jsou Apache Kafka nebo Azure Event Hubs.
Co jsou jímky DLT?
Jímky DLT umožňují zapisovat transformovaná data do cílů, jako jsou služby streamování událostí, jako jsou Apache Kafka nebo Azure Event Hubs, a externí tabulky spravované katalogem Unity nebo metastorem Hive. Dříve bylo možné streamované tabulky a materializovaná zobrazení vytvořená v kanálu DLT zachovat pouze do tabulek Delta spravovaných službou Azure Databricks. Pomocí jímek teď máte další možnosti pro zachování výstupu kanálů DLT.
Kdy mám používat jímky DLT?
Databricks doporučuje používat jímky DLT, pokud potřebujete:
- Vytvořte provozní případ použití, jako je detekce podvodů, analýzy v reálném čase a doporučení zákazníků. Provozní případy použití obvykle čtou data ze sběrnice zpráv, například z tématu Apache Kafka, poté tato data zpracovávají s nízkou latencí a zpracované záznamy zapisují zpět do sběrnice zpráv. Tento přístup umožňuje dosáhnout nižší latence tím, že nezapisujete nebo nečtete z cloudového úložiště.
- Zapisujte transformovaná data z toků DLT do tabulek spravovaných externí instancí Delta, včetně tabulek spravovaných Unity Catalog, externích tabulek a tabulek v Hive metastore.
- Proveďte reverzní ETL do externích cílových úložišť pro Databricks, jako jsou témata Apache Kafka. Tento přístup umožňuje efektivně podporovat případy použití, kdy je potřeba data číst nebo používat mimo tabulky katalogu Unity nebo jiné úložiště spravované službou Databricks.
Jak se používají jímky DLT?
Poznámka
- Podporují se jenom dotazy streamování používající
spark.readStream
adlt.read_stream
. Dávkové dotazy nejsou podporovány. - K zápisu do sínků je možné použít pouze
append_flow
. Jiné toky, napříkladapply_changes
, se nepodporují. - Spuštění úplné aktualizace nevymaže dříve vypočítaná data výsledků ve vstupech. To znamená, že se do jímky připojí všechna znovu zpracovaná data a stávající data nebudou změněna.
Když jsou data událostí ingestována ze streamovacího zdroje do vaší DLT pipeline, zpracujete a zpřesníte tato data pomocí funkcionality DLT a poté použijete zpracování přírustkového toku ke streamování transformovaných datových záznamů do úložiště DLT. Tuto jímku vytvoříte pomocí funkce create_sink()
. Další podrobnosti o použití funkce create_sink
najdete v referenčních informacích k rozhraní API jímky .
K implementaci jímky DLT použijte následující kroky:
- Nastavte kanál DLT pro zpracování streamovaných dat událostí a přípravu datových záznamů pro zápis do jímky DLT.
- Nakonfigurujte a vytvořte jímku DLT tak, aby používala upřednostňovaný cílový formát jímky.
- Pomocí připojte tok k zápisu připravených záznamů do jímky.
Tyto kroky jsou popsané ve zbývající části tématu.
Nastavení kanálu DLT pro přípravu záznamů k zápisu do úložiště
Prvním krokem je nastavení kanálu DLT pro transformaci nezpracovaných dat proudu událostí na připravená data, která zapíšete do úložiště.
Pokud chcete lépe porozumět tomuto procesu, můžete postupovat podle tohoto příkladu kanálu DLT, který zpracovává data událostí clickstream z wikipedia-datasets
ukázkových dat v Databricks. Tento kanál analyzuje nezpracovanou datovou sadu a identifikuje stránky Wikipedie, které odkazují na stránku dokumentace Apache Sparku, a postupně zpřesňuje tato data pouze na řádky tabulky, kde odkazující odkaz obsahuje Apache_Spark.
V tomto příkladu je kanál DLT strukturovaný pomocí architektury medallion, která uspořádá data do různých vrstev, aby se zlepšila efektivita kvality a zpracování.
Začněte tím, že načtete nezpracované záznamy JSON z datové sady do bronzové vrstvy pomocí Automatického Zavaděče. Tento kód Pythonu ukazuje, jak vytvořit streamovací tabulku s názvem clickstream_raw
, která obsahuje nezpracovaná data ze zdroje:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
Po spuštění tohoto kódu jsou data nyní na úrovni "bronzových" (nebo "nezpracovaných dat") architektury Medallion a musí být vyčištěna. Další krok zpřesní data na "stříbrnou" úroveň, která zahrnuje vyčištění datových typů a názvů sloupců a použití očekávání DLT k zajištění integrity dat.
Následující kód ukazuje, jak to provést vyčištěním a ověřením dat bronzové vrstvy do tabulky clickstream_clean
silver:
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
Pokud chcete vyvinout vrstvu „zlatá“ struktury zpracování, filtrujete vyčištěná clickstream data, abyste izolovali položky, ve kterých je odkazovaná stránka Apache_Spark
. V tomto posledním příkladu kódu vyberete pouze sloupce potřebné k zápisu do cílové tabulky jímky.
Následující kód ukazuje, jak vytvořit tabulku s názvem spark_referrers
představující zlatou vrstvu:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
Po dokončení tohoto procesu přípravy dat musíte nakonfigurovat cílové jímky, do kterých se zapíšou vyčištěné záznamy.
Konfigurace jímky DLT
Databricks podporuje tři typy cílových jímek, do kterých zapisujete záznamy zpracovávané z dat datového proudu:
- Jímky tabulky Delta
- Jímky Apache Kafka
- Jímky služby Azure Event Hubs
Níže jsou uvedeny příklady konfigurací pro jímky Delta, Kafka a Azure Event Hubs:
Delta jímky
Postup vytvoření jímky Delta podle cesty k souboru:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Vytvoření jímky Delta podle názvu tabulky pomocí plně kvalifikovaného katalogu a cesty schématu:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Jímky Kafka a Azure Event Hubs
Tento kód funguje pro jímky Apache Kafka i Azure Event Hubs.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Teď, když je vaše jímka nakonfigurovaná a kanál DLT je připravený, můžete začít streamovat zpracovávané záznamy do jímky.
Zápis do DLT úložiště s procesem připojení dat
Když je jímka nakonfigurovaná, dalším krokem je zápis zpracovaných záznamů tak, že ho zadáte jako cíl pro výstup záznamů přidávacím tokem. Proveďte to specifikováním jímky jako hodnoty target
v dekorátoru append_flow
.
- U spravovaných a externích tabulek v Katalogu Unity použijte formát
delta
a v možnostech zadejte cestu nebo název tabulky. Vaše kanály DLT musí být nakonfigurované tak, aby používaly katalog Unity. - V případě témat Apache Kafka použijte formát
kafka
a v možnostech zadejte název tématu, informace o připojení a ověřovací informace. Jedná se o stejné možnosti, které podporuje jímka Kafka strukturovaného streamování Sparku. Viz Konfigurace zapisovače strukturovaného streamování Kafka. - Pro Službu Azure Event Hubs použijte formát
kafka
a v možnostech zadejte název služby Event Hubs, informace o připojení a ověřovací informace. Jedná se o stejné možnosti podporované v jímce Event Hubs strukturovaného streamování Sparku, která používá rozhraní Kafka. Viz ověřování služebního principálu pomocí Microsoft Entra ID a Azure Event Hubs. - U tabulek metastoru Hive použijte formát
delta
a v možnostech zadejte cestu nebo název tabulky. Vaše kanály DLT musí být nakonfigurované tak, aby používaly metastor Hive.
Níže jsou uvedeny příklady nastavení toků pro zápis do jímek Delta, Kafka a Azure Event Hubs se záznamy zpracovanými kanálem DLT.
Delta jímka
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Jímky Kafka a Azure Event Hubs
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
Parametr value
je povinný pro jímku služby Azure Event Hubs. Další parametry, jako jsou key
, partition
, headers
a topic
, jsou volitelné.
Další podrobnosti o dekorátoru append_flow
najdete v tématu Použití přidávacího toku k zápisu do streamované tabulky z více zdrojových datových proudů.
Omezení
Podporuje se jenom rozhraní Python API. SQL není podporován.
Podporují se jenom dotazy streamování používající
spark.readStream
adlt.read_stream
. Dávkové dotazy nejsou podporovány.K zápisu do jímek je možné použít pouze
append_flow
. Jiné toky, napříkladapply_changes
, nejsou podporované a v definici datové sady DLT nemůžete použít jímku. Například následující se nepodporuje:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
U jímek Delta musí být název tabulky plně kvalifikovaný. Konkrétně v případě spravovaných externích tabulek Katalogu Unity musí být název tabulky ve formátu
<catalog>.<schema>.<table>
. Pro metastor Hive musí být ve formátu<schema>.<table>
.Spuštění
FullRefresh
nevyčistí dříve vypočítaná data výsledků v datových úložištích. To znamená, že se do jímky připojí všechna znovu zpracovaná data a stávající data nebudou změněna.Očekávání DLT nejsou podporována.