Sdílet prostřednictvím


Streamování záznamů do externích služeb pomocí výstupů Delta Live Tables

Důležitý

Rozhraní API sink Delta Live Tables je v Public Preview.

Tento článek popisuje rozhraní API Delta Live Tables sink a jeho použití s toky DLT k zápisu záznamů transformovaných pomocí pipeline do externího datového úložiště, jako jsou spravované a externí tabulky Unity Catalog, tabulky metastore Hive a služby streamování událostí, jako jsou Apache Kafka nebo Azure Event Hubs.

Co jsou jímky delta živých tabulek?

Výstupy Delta Live Tables umožňují zapisovat transformovaná data do cílů, jako jsou služby streamování událostí, například Apache Kafka nebo Azure Event Hubs, a do externích tabulek spravovaných katalogem Unity nebo metastorem Hive. Dříve bylo možné streamované tabulky a materializovaná zobrazení vytvořená v kanálu Delta Live Tables zachovat pouze u spravovaných tabulek Delta ve službě Azure Databricks. Pomocí jímek teď máte další možnosti pro zachování výstupu kanálů Delta Live Tables.

Kdy mám použít výstupy pro Delta Live Tables?

Databricks doporučuje používat jímky dynamických tabulek Delta, pokud potřebujete:

  • Vytvořte provozní případ použití, jako je detekce podvodů, analýzy v reálném čase a doporučení zákazníků. Provozní případy použití obvykle čtou data z sběrnice zpráv, například z tématu Apache Kafka, a pak zpracovávají tato data s nízkou latencí a zapisují zpracované záznamy zpět do sběrnice zpráv. Tento přístup umožňuje dosáhnout nižší latence tím, že nezapisujete nebo nečtete z cloudového úložiště.
  • Zapisujte transformovaná data z toků Delta Live Tables do tabulek spravovaných externí instancí Delta, včetně tabulek spravovaných katalogem Unity, externích tabulek a tabulek v metastoru Hive.
  • Proveďte reverzní extrakci-transformaci-nahrávání (ETL) do externích úložišť mimo Databricks, jako jsou témata Apache Kafka. Tento přístup umožňuje efektivně podporovat případy použití, kdy je potřeba data číst nebo používat mimo tabulky katalogu Unity nebo jiné úložiště spravované službou Databricks.

Jak se používají jímky Delta Live Tables?

Poznámka

  • Podporují se jenom dotazy streamování používající spark.readStream a dlt.read_stream. Dávkové dotazy nejsou podporovány.
  • K zápisu do jímek je možné použít pouze append_flow. Jiné toky, například apply_changes, se nepodporují.
  • Spuštění úplné aktualizace nevyčistí dříve vypočítaná data výsledků v jímkách. To znamená, že se do jímky připojí všechna znovu zpracovaná data a stávající data nebudou změněna.

Když jsou data událostí ingestována ze streamovacího zdroje do pipeline Delta Live Tables, zpracováváte a zpřesňujete tato data pomocí funkčnosti Delta Live Tables a poté používáte zpracování přídatného toku k streamování transformovaných datových záznamů do úložiště Delta Live Tables. Tuto jímku vytvoříte pomocí funkce create_sink(). Další podrobnosti o použití funkce create_sink najdete v referenčních informacích k rozhraní API jímky .

K implementaci jímky Delta Live Tables použijte následující kroky:

  1. Nastavte kanál Delta Live Tables pro zpracování streamovaných dat událostí a přípravu datových záznamů pro zápis do jímky Delta Live Tables.
  2. Nakonfigurujte a vytvořte jímku Delta Live Tables tak, aby používala upřednostňovaný cílový formát jímky.
  3. Pomocí připojte tok k zápisu připravených záznamů do jímky.

Tyto kroky jsou popsané ve zbývající části tématu.

Nastavení kanálu Delta Live Tables pro přípravu záznamů pro zápis do jímky

Prvním krokem je nastavení potrubí Delta Live Tables pro transformaci surových dat z datového proudu událostí na připravená data, která zapíšete do svého cílového úložiště.

Pokud chcete lépe porozumět tomuto procesu, můžete postupovat podle tohoto příkladu potrubí Delta Live Tables, které zpracovává data událostí clickstream z ukázkových dat wikipedia-datasets v rámci Databricks. Tento kanál analyzuje nezpracovanou datovou sadu a identifikuje stránky Wikipedie, které odkazují na stránku dokumentace Apache Sparku, a postupně zpřesňuje tato data pouze na řádky tabulky, kde odkazující odkaz obsahuje Apache_Spark.

V tomto příkladu je kanál Delta Live Tables strukturovaný pomocí architektury medallion, která uspořádá data do různých vrstev, aby se zlepšila efektivita kvality a zpracování.

Začněte tak, že načtete nezpracované záznamy JSON z datové sady do bronzové vrstvy použitím Auto Loader. Tento kód Pythonu ukazuje, jak vytvořit streamovanou tabulku s názvem clickstream_raw, která obsahuje nezpracovaná data ze zdroje:

import dlt

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"

@dlt.table(
 comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
 table_properties={
   "quality": "bronze"
 }
)
def clickstream_raw():
 return (
   spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
 )

Po spuštění tohoto kódu jsou data nyní na úrovni "bronzových" (nebo "nezpracovaných dat") architektury Medallion a musí být vyčištěna. V dalším kroku se data zpřesní na úroveň "silver", která zahrnuje vyčištění datových typů a názvů sloupců a použití očekávání Delta Live Tables k zajištění integrity dat.

Následující kód ukazuje, jak to provést vyčištěním a ověřením dat bronzové vrstvy do tabulky clickstream_clean silver:

@dlt.table(
 comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
 table_properties={
   "quality": "silver"
 }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
 return (
   spark.readStream.table("clickstream_raw")
     .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
     .withColumn("click_count", expr("CAST(n AS INT)"))
     .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
     .withColumnRenamed("curr_title", "current_page_title")
     .withColumnRenamed("prev_title", "previous_page_title")
     .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
 )

Pokud chcete vytvořit vrstvu "gold" struktury kanálu, filtrujete vyčištěná data clickstream, abyste izolovali položky, ve kterých je odkazovaná stránka Apache_Spark. V tomto posledním příkladu kódu vyberete pouze sloupce potřebné k zápisu do cílové tabulky jímky.

Následující kód ukazuje, jak vytvořit tabulku s názvem spark_referrers představující zlatou vrstvu:

@dlt.table(
 comment="A table of the most common pages that link to the Apache Spark page.",
 table_properties={
   "quality": "gold"
 }
)
def spark_referrers():
 return (
   spark.readStream.table("clickstream_clean")
     .filter(expr("current_page_title == 'Apache_Spark'"))
     .withColumnRenamed("previous_page_title", "referrer")
     .select("referrer", "current_page_id", "current_page_title", "click_count")
 )

Po dokončení tohoto procesu přípravy dat musíte nakonfigurovat cílové jímky, do kterých se zapíšou vyčištěné záznamy.

Konfigurace výstupu pro Delta Live Tables

Databricks podporuje tři typy cílových jímek, do kterých zapisujete záznamy zpracovávané z dat datového proudu:

  • Zápustná umyvadla Delta
  • Jímky Apache Kafka
  • Jímky služby Azure Event Hubs

Níže jsou uvedeny příklady konfigurací pro jímky Delta, Kafka a Azure Event Hubs:

Delta jímky

Postup vytvoření jímky Delta podle cesty k souboru:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

Vytvoření jímky Delta podle názvu tabulky pomocí plně kvalifikovaného katalogu a cesty schématu:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "my_catalog.my_schema.my_table" }
)

Jímky Kafka a Azure Event Hubs

Tento kód funguje pro jímky Apache Kafka i Azure Event Hubs.

topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")

eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
  + f' required username="$ConnectionString" password="{connection_string}";'

dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
    "topic": topic_name
  }
)

Teď, když je vaše jímka nakonfigurovaná a kanál Delta Live Tables je připravený, můžete začít streamovat zpracovávané záznamy do jímky.

Psaní do úložiště Delta Live Tables s přidáním toku

Když je jímka nakonfigurovaná, dalším krokem je zápis zpracovaných záznamů tak, že ho zadáte jako cíl pro výstup záznamů přidávacím tokem. Provedete to zadáním jímky jako hodnoty target v dekorátoru append_flow.

  • U spravovaných a externích tabulek v Katalogu Unity použijte formát delta a v možnostech zadejte cestu nebo název tabulky. Kanály Delta Live Tables musí být nakonfigurované tak, aby používaly katalog Unity.
  • V případě témat Apache Kafka použijte formát kafka a v možnostech zadejte název tématu, informace o připojení a ověřovací informace. Jedná se o stejné možnosti, které podporuje jímka Kafka strukturovaného streamování Sparku. Viz Konfigurace zapisovače strukturovaného streamování Kafka.
  • Pro Službu Azure Event Hubs použijte formát kafka a v možnostech zadejte název služby Event Hubs, informace o připojení a ověřovací informace. Jedná se o stejné možnosti podporované v jímce Event Hubs strukturovaného streamování Sparku, která používá rozhraní Kafka. Podívejte se na ověřování služebního principálu s Microsoft Entra ID a Azure Event Hubs.
  • U tabulek metastoru Hive použijte formát delta a v možnostech zadejte cestu nebo název tabulky. Kanály Delta Live Tables musí být nakonfigurované tak, aby používaly metadatové úložiště Hive.

Níže jsou uvedeny příklady nastavení toků pro zápis do jímek Delta, Kafka a Azure Event Hubs se záznamy zpracovanými kanálem Delta Live Tables.

Rozdílová jímka

@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

Jímky Kafka a Azure Event Hubs

@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
  spark.readStream.table("spark_referrers")
  .selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)

Parametr value je povinný pro jímku služby Azure Event Hubs. Další parametry, jako jsou key, partition, headersa topic, jsou volitelné.

Další podrobnosti o dekorátoru append_flow najdete v tématu Použití přidávacího toku k zápisu do streamované tabulky z více zdrojových datových proudů.

Omezení

  • Podporuje se jenom rozhraní Python API. SQL není podporován.

  • Podporují se jenom dotazy streamování používající spark.readStream a dlt.read_stream. Dávkové dotazy nejsou podporovány.

  • K zápisu do jímek je možné použít pouze append_flow. Jiné toky, například apply_changes, nejsou podporované a v definici datové sady Delta Live Tables nemůžete použít jímku. Například následující se nepodporuje:

    @table("from_sink_table")
    def fromSink():
      return read_stream("my_sink")
    
  • U jímek Delta musí být název tabulky plně kvalifikovaný. Konkrétně v případě spravovaných externích tabulek Katalogu Unity musí být název tabulky ve formátu <catalog>.<schema>.<table>. Pro metastore Hive musí být ve formátu <schema>.<table>.

  • Spuštění FullRefresh nevymaže dříve vypočítaná výsledková data v úložištích. To znamená, že se do jímky připojí všechna znovu zpracovaná data a stávající data nebudou změněna.

  • Delta Live Tables očekávání nejsou podporována.

Prostředky