Sdílet prostřednictvím


Čtení a zápisy streamovaných tabulek Delta

Delta Lake je hluboce integrovaná se strukturovaným streamováním Sparku prostřednictvím readStream a writeStream. Delta Lake překonává řadu omezení, která jsou obvykle spojena se systémy a soubory streamování, včetně těchto:

  • Shodování malých souborů vytvořených ingestováním s nízkou latencí
  • Udržování zpracování "přesně jednou" s více než jedním datovým proudem (nebo souběžnými dávkovými úlohami).
  • Efektivní zjišťování nových souborů při použití souborů jako zdroje datového proudu

Poznámka:

Tento článek popisuje použití tabulek Delta Lake jako zdrojů streamování a jímek. Informace o načítání dat pomocí streamovaných tabulek v Databricks SQL najdete v tématu Načtení dat pomocí streamovaných tabulek v Databricks SQL.

Informace o statických spojeních datových proudů pomocí Delta Lake najdete v tématu Stream-static joins.

tabulka Delta jako zdroj

Strukturované streamování přírůstkově čte tabulky Delta. Zatímco je proti tabulce Delta aktivní streamovací dotaz, nové záznamy se zpracovávají idempotentním způsobem, když se nové verze zapisují do zdrojové tabulky.

Následující příklady kódu ukazují konfiguraci streamovaného čtení pomocí názvu tabulky nebo cesty k souboru.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Důležité

Pokud se schéma tabulky Delta změní po zahájení streamovaného čtení z tabulky, dotaz selže. U většiny změn schématu můžete stream restartovat, abyste vyřešili neshodu schématu a pokračovali ve zpracování.

V Databricks Runtime 12.2 LTS a níže nelze streamovat z tabulky Delta s povoleným mapováním sloupců, u kterého došlo k vývoji nesoudatného schématu, jako je přejmenování nebo vyřazení sloupců. Podrobnosti najdete v tématu Streamování s mapováním sloupců a změnami schématu.

Omezení vstupní rychlosti

Pro řízení mikrodávek jsou k dispozici následující možnosti:

  • maxFilesPerTrigger: Kolik nových souborů se má brát v úvahu v každé mikrodávce. Výchozí hodnota je 1 000.
  • maxBytesPerTrigger: Kolik dat se zpracuje v každé mikrodávce. Tato možnost nastaví "soft max", což znamená, že dávkové procesy zpracovávají přibližně toto množství dat a mohou překročit limit, aby umožnily pokračování streamovacího dotazu, pokud je nejmenší vstupní jednotka větší než tento limit. Tato možnost není ve výchozím nastavení nastavená.

Pokud používáte maxBytesPerTrigger ve spojení s maxFilesPerTrigger, mikrodávkový proces zpracovává data, dokud nedosáhne limitu jednoho z maxFilesPerTrigger nebo maxBytesPerTrigger.

Poznámka:

V případech, kdy jsou transakce zdrojové tabulky vyčištěny kvůli konfiguraci logRetentionDuration a streamovací dotaz se pokusí zpracovat tyto verze, dotaz ve výchozím nastavení se nezdaří zabránit ztrátě dat. Možnost failOnDataLoss můžete nastavit tak, aby false ignorovala ztracená data a pokračovala ve zpracování.

Streamování kanálu CDC (Delta Lake Change Data Capture)

Delta Lake změnový datový kanál zaznamenává změny v tabulce Delta, včetně aktualizací a odstranění. Pokud je tato možnost povolená, můžete streamovat z datového kanálu změn a zapisovat logiku pro zpracování vkládání, aktualizací a odstraňování do podřízených tabulek. I když se výstup dat z kanálu datových změn mírně liší od tabulky Delta, kterou popisuje, poskytuje řešení pro šíření přírůstkových změn do podřízených tabulek v architektuře medailiónu.

Důležité

V Databricks Runtime 12.2 LTS a starší nemůžete streamovat z datového kanálu změn pro tabulku Delta s aktivním mapováním sloupců, u které došlo k neaditivní evoluci schématu, jako je přejmenování nebo vyřazení sloupců. Viz Streamování s mapováním sloupců a změnami schématu.

Ignorování aktualizací a odstranění

Strukturované streamování nezpracuje vstup, který není připojením, a vyvolá výjimku, pokud dojde k nějakým změnám v tabulce, která se používá jako zdroj. Existují dvě hlavní strategie pro řešení změn, které nelze automaticky rozšířit do podřízeného procesu:

  • Výstup a kontrolní bod můžete odstranit a restartovat stream od začátku.
  • Můžete nastavit jednu z těchto dvou možností:
    • ignoreDeletes: Přehlížejte transakce, které odstraňují data na hranicích oddílů.
    • skipChangeCommits: ignorujte transakce, které odstraňují nebo upravují existující záznamy. skipChangeCommits zahrnuje ignoreDeletes.

Poznámka:

Ve službě Databricks Runtime 12.2 LTS a novějších skipChangeCommits je předchozí nastavení ignoreChangeszastaralé . V Databricks Runtime 11.3 LTS a nižší ignoreChanges je jedinou podporovanou možností.

Sémantika pro ignoreChanges se výrazně liší od skipChangeCommits. Pokud je povoleno ignoreChanges, přepsané datové soubory ve zdrojové tabulce jsou znovu vydány po operaci změny dat, jako je UPDATE, MERGE INTO, DELETE (v rámci oddílů) nebo OVERWRITE. Nezměněné řádky se často generují společně s novými řádky, takže podřízení příjemci musí být schopni zpracovávat duplicity. Odstranění se nešíří v podřízené části. ignoreChanges zahrnuje ignoreDeletes.

skipChangeCommits zcela ignoruje operace změny souborů. Datové soubory, které se přepíšou ve zdrojové tabulce z důvodu operace změny dat, jako jsou UPDATE, MERGE INTO, DELETEa OVERWRITE, se zcela ignorují. Aby se projevily změny v nadřazených zdrojových tabulkách, musíte implementovat samostatnou logiku pro šíření těchto změn.

Úlohy nakonfigurované ignoreChanges tak, aby nadále fungovaly pomocí známé sémantiky, ale Databricks doporučuje používat skipChangeCommits pro všechny nové úlohy. Migrace úloh, které používají ignoreChanges k tomu, aby skipChangeCommits vyžadovaly logiku refaktoringu

Příklad

Předpokládejme například, že máte tabulku user_events s date, user_emaila action sloupce rozdělené podle date. Streamujete z tabulky user_events a potřebujete z ní odstranit data z důvodu GDPR.

Když odstraníte na hranicích oddílů (to znamená, že WHERE je na sloupcovém oddílu), soubory jsou už segmentované podle hodnoty, takže odstranění jednoduše vyloučí tyto soubory z metadat. Když odstraníte celý oddíl dat, můžete použít následující:

spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

Pokud odstraníte data v několika oddílech (v tomto příkladu filtrování user_email), použijte následující syntaxi:

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

Pokud aktualizujete user_email příkazem UPDATE, přepíše se soubor obsahující příslušný user_email. Slouží skipChangeCommits k ignorování změněně datových souborů.

Zadat počáteční pozici

Pomocí následujících možností můžete určit výchozí bod zdroje streamování Delta Lake bez zpracování celé tabulky.

  • startingVersion: Verze Delta Lake, od které se má začít. Databricks doporučuje tuto možnost vynechat pro většinu úloh. Pokud není nastaven, datový proud začíná od nejnovější dostupné verze, včetně kompletního snímku stavu tabulky v danou chvíli.

    Pokud je zadaný, datový proud přečte všechny změny v tabulce Delta počínaje zadanou verzí (včetně). Pokud už zadaná verze není dostupná, stream se nespustí. Verze potvrzení můžete získat ze sloupce version výstupu příkazu DESCRIBE HISTORY.

    Chcete-li vrátit pouze nejnovější změny, zadejte latest.

  • startingTimestamp: Časové razítko, od které se má začít. Všechny změny tabulky potvrzené při nebo po časovém razítku (včetně) jsou přečteny streamovacím čtečem. Pokud zadané časové razítko předchází všem potvrzením tabulky, streamované čtení začne s nejstarším dostupným časovým razítkem. Jedna z těchto možností:

    • Řetězec časového razítka. Například "2019-01-01T00:00:00.000Z".
    • Řetězec kalendářního data. Například "2019-01-01".

Obě možnosti nelze nastavit současně. Projeví se pouze při spuštění nového streamovacího dotazu. Pokud se spustil dotaz streamování a průběh se zaznamenal ve svém kontrolním bodu, tyto možnosti se ignorují.

Důležité

I když zdroj streamování můžete spustit ze zadané verze nebo časového razítka, schéma zdroje streamování je vždy nejnovější schéma tabulky Delta. Po zadané verzi nebo časovém razítku je nutné zajistit, aby nedošlo k žádné nekompatibilní změně schématu tabulky Delta. Jinak může zdroj streamování vrátit nesprávné výsledky při čtení dat s nesprávným schématem.

Příklad

Předpokládejme například, že máte tabulku user_events. Pokud chcete číst změny od verze 5, použijte:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

Pokud chcete číst změny od 10. 10. 2018, použijte:

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

Zpracování počátečního snímku bez vyřazení dat

Tato funkce je dostupná pro Databricks Runtime 11.3 LTS a vyšší.

Při použití tabulky Delta jako zdroje datového proudu dotaz nejprve zpracuje všechna data, která jsou v tabulce. Tabulka Delta v této verzi se nazývá počáteční snímek. Ve výchozím nastavení se datové soubory tabulky Delta zpracovávají na základě toho, který soubor byl naposledy změněn. Čas poslední změny však nemusí nutně představovat pořadí času události záznamu.

Ve stavovém streamovacím dotazu s definovaným vodoznakem může zpracování souborů časem úprav vést ke zpracování záznamů v nesprávném pořadí. To by mohlo vést k tomu, že záznamy budou považovány za pozdní události podle vodoznaku.

Problém s odstraněním dat se můžete vyhnout povolením následující možnosti:

  • withEventTimeOrder: Určuje, zda má být počáteční snímek zpracován s časovým pořadím událostí.

Pokud je povolené časové pořadí událostí, časový rozsah počátečních dat snímků je rozdělený do časových intervalů. Každá mikrodávka zpracovává kbelík filtrováním dat v rámci časového rozsahu. Možnosti konfigurace maxFilesPerTrigger a maxBytesPerTrigger se stále vztahují k řízení velikosti mikrobatchu, ale pouze přibližným způsobem kvůli povaze zpracování.

Následující obrázek ukazuje tento proces:

Počáteční snímek

Důležité informace o této funkci:

  • K problému s poklesem dat dochází pouze v případě, že se ve výchozím pořadí zpracuje počáteční snímek stavového streamovacího dotazu.
  • Po spuštění dotazu streamu nelze změnit withEventTimeOrder , když se počáteční snímek stále zpracovává. Pokud chcete restartovat změny withEventTimeOrder , musíte kontrolní bod odstranit.
  • Pokud spouštíte dotaz streamu s povoleným parametremEventTimeOrder, nemůžete ho downgradovat na verzi DBR, která tuto funkci nepodporuje, dokud se počáteční zpracování snímku nedokončí. Pokud potřebujete downgradovat, můžete počkat na dokončení počátečního snímku nebo odstranit kontrolní bod a restartovat dotaz.
  • Tato funkce není podporována v následujících neobvyklých scénářích:
    • Sloupec času události je generovaný sloupec a mezi zdrojem Delta a vodoznakem existují neprojekční transformace.
    • V dotazu streamu je vodoznak, který obsahuje více než jeden zdroj Delta.
  • Pokud je povolené časové pořadí událostí, může být výkon počátečního zpracování snímků Delta pomalejší.
  • Každá mikro batch prohledá počáteční snímek a vyfiltruje data v odpovídajícím časovém rozsahu událostí. Pro rychlejší akci filtrování se doporučuje použít zdrojový sloupec Delta jako čas události, aby bylo možné aplikovat přeskakování dat (zkontrolujte Přeskakování dat pro Delta Lake, kdy je to použitelné). Kromě toho může dělení tabulky ve sloupci času události dále urychlit zpracování. Můžete zkontrolovat uživatelské rozhraní Sparku a zjistit, kolik rozdílových souborů se kontroluje pro konkrétní mikrodávku.

Příklad

Předpokládejme, že máte tabulku user_events se sloupcem event_time. Dotaz streamování je agregační dotaz. Pokud chcete zajistit, aby během počátečního zpracování snímků nedošlo k žádnému poklesu dat, můžete použít:

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

Poznámka:

Můžete to také povolit pomocí konfigurace Sparku v clusteru, který bude platit pro všechny dotazy streamování: spark.databricks.delta.withEventTimeOrder.enabled true

tabulky Delta jako jímky

Data můžete také zapisovat do tabulky Delta pomocí strukturovaného streamování. Transakční protokol umožňuje službě Delta Lake zaručit přesně jedno zpracování, a to i v případě, že v tabulce běží souběžně jiné streamy nebo dávkové dotazy.

Poznámka:

Funkce Delta Lake VACUUM odebere všechny soubory, které nespravuje Delta Lake, ale přeskočí všechny adresáře, které začínají _. Kontrolní body můžete bezpečně ukládat společně s dalšími daty a metadaty tabulky Delta pomocí adresářové struktury, jako je například <table-name>/_checkpoints.

Metriky

Počet bajtů a počet souborů, které se ještě mají zpracovat v procesu dotazu streamování, můžete zjistit jako metriky numBytesOutstanding a numFilesOutstanding počet bajtů. Mezi další metriky patří:

  • numNewListedFiles: Počet souborů Delta Lake, které byly uvedeny pro výpočet backlogu pro tuto dávku.
    • backlogEndOffset: Verze tabulky použitá k výpočtu backlogu.

Pokud stream spouštíte v poznámkovém bloku, můžete tyto metriky zobrazit na kartě Nezpracovaná data na řídicím panelu průběhu dotazu streamování:

{
  "sources": [
    {
      "description": "DeltaSource[file:/path/to/source]",
      "metrics": {
        "numBytesOutstanding": "3456",
        "numFilesOutstanding": "8"
      }
    }
  ]
}

Režim připojení

Ve výchozím nastavení se datové proudy spouští v režimu připojení, který přidává nové záznamy do tabulky.

Při streamování do tabulek použijte metodu toTable, jak je znázorněno v následujícím příkladu:

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Režim dokončení

Pomocí strukturovaného streamování můžete také nahradit celou tabulku při každém zpracování. Jedním z příkladů použití je výpočet souhrnu pomocí agregace:

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

Předchozí příklad průběžně aktualizuje tabulku, která obsahuje agregovaný počet událostí podle zákazníka.

U aplikací s vyššími požadavky na latenci můžete ušetřit výpočetní prostředky pomocí jednorázových aktivačních událostí. Tyto možnosti slouží k aktualizaci souhrnných agregačních tabulek podle daného plánu a zpracování pouze nových dat, která přišla od poslední aktualizace.

Upsertování ze streamovaných dotazů pomocí foreachBatch

Můžete použít kombinaci merge a foreachBatch k zápisu složitých upsertů ze streamovacího dotazu do tabulky Delta. Viz Použití příkazu foreachBatch k zápisu do libovolných datových jímek.

Tento model má mnoho aplikací, včetně následujících:

  • Zápis streamových agregací v režimu aktualizace: To je mnohem efektivnější než režim úplného zpracování.
  • Zápis streamu změn databáze do tabulky Delta: Dotaz na sloučení pro zápis změnových dat lze použít v foreachBatch k nepřetržité aplikaci proudu změn do tabulky Delta.
  • Zápis datového proudu do tabulky Delta sodstranění duplicitních dat: slučovací dotaz jen pro vložení pro odstranění duplicitních dat lze použít v foreachBatch k průběžnému zápisu dat (s duplicitami) do tabulky Delta s automatickým odstraněním duplicitních dat.

Poznámka:

  • Ujistěte se, že příkaz merge uvnitř foreachBatch je idempotentní jako restartování streamovacího dotazu může operaci použít ve stejné dávce dat vícekrát.
  • Při merge použití v foreachBatchsestavě může být vstupní rychlost dat streamovaného dotazu (hlášená StreamingQueryProgress a viditelná v grafu rychlosti poznámkového bloku) hlášena jako násobek skutečné rychlosti, s jakou se data generují ve zdroji. Je to proto, že merge několikrát přečte vstupní data, což způsobuje násobení vstupních metrik. Pokud se jedná o kritický bod, můžete datový rámec dávky uložit do mezipaměti před merge a po merge uložení do mezipaměti zrušit.

Následující příklad ukazuje, jak můžete k provedení této úlohy použít SQL foreachBatch :

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Můžete se také rozhodnout použít rozhraní Delta Lake API k provádění přenosů streamování, jak je znázorněno v následujícím příkladu:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

idempotentní tabulka zapisuje do foreachBatch

Poznámka:

Databricks doporučuje konfigurovat samostatný zápis streamování pro každou jímku, kterou chcete aktualizovat, místo použití foreachBatch. Důvodem je to, že zápisy do více tabulek jsou serializovány při použití příkazu foreachBatch, což snižuje paralelizaci a zvyšuje celkovou latenci.

Tabulky Delta podporují následující možnosti DataFrameWriter, aby umožnily idempotentní zápisy do více tabulek v rámci foreachBatch.

  • txnAppId: Jedinečný řetězec, který můžete předat každému zápisu datového rámce. Můžete například použít ID StreamingQuery jako txnAppId.
  • txnVersion: Monotonicky rostoucí číslo, které funguje jako verze transakce.

Delta Lake používá kombinaci duplicitních txnAppId zápisů a txnVersion k jejich identifikaci a ignorování.

Pokud dojde k přerušení dávkového zápisu selháním, opětovným spuštěním dávky se použije stejná aplikace a ID dávky, aby modul runtime správně identifikoval duplicitní zápisy a ignoroval je. ID aplikace (txnAppId) může být libovolný jedinečný řetězec vygenerovaný uživatelem a nemusí souviset s ID datového proudu. Viz Použití příkazu foreachBatch k zápisu do libovolných datových jímek.

Upozorňující

Pokud odstraníte kontrolní bod streamování a restartujete dotaz s novým kontrolním bodem, musíte zadat jiný txnAppId. Nové kontrolní body začínají id dávky 0. Delta Lake používá ID dávky a txnAppId jako jedinečný klíč a přeskočí dávky s již viditelnými hodnotami.

Následující příklad kódu ukazuje tento vzor:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}