Dela via


Delta table strömmande läsningar och skrivningar

Delta Lake är djupt integrerat med Spark Structured Streaming via readStream och writeStream. Delta Lake övervinner många av de begränsningar som vanligtvis är associerade med strömningssystem och filer, inklusive:

  • Sammansejsa små filer som genereras av inmatning med låg svarstid.
  • Underhåll av "exakt en gång"-bearbetning med mer än en ström (eller samtidiga batchjobb).
  • Identifiera effektivt vilka filer som är nya när du använder filer som källa för en dataström.

Kommentar

Den här artikeln beskriver hur du använder Delta Lake tables som strömningskällor och mottagare. Information om hur du läser in data med hjälp av strömmande tables i Databricks SQL finns i Läsa in data med strömmande tables i Databricks SQL.

Information om stream-static-kopplingar med Delta Lake finns i Stream-static joins (Stream-static joins).

Delta table som källa

Strukturerad direktuppspelning läser inkrementellt Delta tables. Medan en strömmande fråga är aktiv mot en Delta-tablebearbetas nya poster idempotent när nya table-versioner commiteras till källan table.

Följande kodexempel visar hur du konfigurerar en strömmande läsning genom att använda antingen namnet table eller filsökvägen.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Viktigt!

Om schema för en Delta-table ändras efter att en strömningsläsning börjar mot tablemisslyckas frågan. För de flesta schema ändringar kan du starta om strömmen för att lösa schema matchningsfel och fortsätta bearbetningen.

I Databricks Runtime 12.2 LTS och tidigare kan du inte strömma från en Delta-table med column-kartläggning aktiverad som har genomgått icke-additiv schema-evolution, såsom namnbyte eller borttagning av columns. Mer information finns i Streaming med column mappning och schema ändringar.

Limit indatafrekvens

Följande alternativ är tillgängliga för att styra mikrobatch:

  • maxFilesPerTrigger: Hur många nya filer som ska beaktas i varje mikrobatch. Standardvärdet är 1000.
  • maxBytesPerTrigger: Hur mycket data som bearbetas i varje mikrobatch. Det här alternativet anger ett "mjukt maxvärde", vilket innebär att en batch bearbetar ungefär den här mängden data och kan bearbeta mer än limit för att få strömningsfrågan att gå framåt i de fall då den minsta indataenheten är större än den här limit. Detta är inte set som standard.

Om du använder maxBytesPerTrigger tillsammans med maxFilesPerTriggerbearbetar mikrobatchprocessen data tills antingen maxFilesPerTrigger eller maxBytesPerTriggerlimit har nåtts.

Kommentar

I de fall då källans table-transaktioner rensas på grund av logRetentionDuration-konfiguration och strömningsfrågan försöker bearbeta dessa versioner misslyckas frågan som standard för att undvika dataförlust. Du kan set alternativet failOnDataLoss för att false ignorera förlorade data och fortsätta bearbetningen.

Strömma ett CDC-flöde (Change Data Capture) i Delta Lake

Delta Lake ändringsdataflöde registrerar ändringar i Delta table, inklusive uppdateringar och borttagningar. När den är aktiverad kan du strömma från ett ändringsdataflöde och utforma logik för att hantera infogningar, uppdateringar och borttagningar till nedströms tables. Även om datautdata för ändringsdataflöde skiljer sig något från Delta-table beskrivs, ger detta en lösning för att sprida inkrementella ändringar till underordnade tables i en medallion-arkitektur.

Viktigt!

I Databricks Runtime 12.2 LTS och tidigare går det inte att strömma från flödet för ändringsdata för en Delta-table med aktiverad column-mappning som har genomgått en schema-utveckling utan tillägg, till exempel när columnshar bytts namn på eller tagits bort. Se Streaming med column mappning och schema ändringar.

Ignorera uppdateringar och borttagningar

Strukturerad direktuppspelning hanterar inte indata som inte är en tilläggsfil och utlöser ett undantag om några ändringar sker på den table som används som källa. Det finns två huvudsakliga strategier för att hantera ändringar som inte kan spridas automatiskt nedströms:

  • Du kan ta bort utdata och kontrollpunkter och starta om strömmen från början.
  • Du kan set något av följande två alternativ:
    • ignoreDeletes: ignorera transaktioner som tar bort data vid partition gränser.
    • skipChangeCommits: ignorera transaktioner som tar bort eller ändrar befintliga poster. skipChangeCommits undersummor ignoreDeletes.

Kommentar

I Databricks Runtime 12.2 LTS och senare skipChangeCommits inaktuella föregående inställning ignoreChanges. I Databricks Runtime 11.3 LTS och lägre ignoreChanges är det enda alternativet som stöds.

Semantiken för ignoreChanges skiljer sig mycket från skipChangeCommits. När ignoreChanges är aktiverat återsänds omskrivna datafiler i källan table, såsom UPDATE, MERGE INTO, DELETE (inom partitioner) eller OVERWRITE, efter en databytesåtgärd. Oförändrade rader genereras ofta tillsammans med nya rader, så nedströmsanvändare måste kunna hantera dubbletter. Borttagningar sprids inte nedströms. ignoreChanges undersummor ignoreDeletes.

skipChangeCommits ignorerar filändringsåtgärder helt och hållet. Datafiler som skrivs om i källan table på grund av att data ändras, till exempel UPDATE, MERGE INTO, DELETEoch OVERWRITE ignoreras helt. För att återspegla ändringar i uppströmskällan tablesmåste du implementera separat logik för att sprida dessa ändringar.

Arbetsbelastningar som konfigurerats med ignoreChanges fortsätter att fungera med hjälp av kända semantik, men Databricks rekommenderar att du använder skipChangeCommits för alla nya arbetsbelastningar. Att migrera arbetsbelastningar med till ignoreChangesskipChangeCommits kräver refaktoriseringslogik.

Exempel

Anta till exempel att du har en tableuser_events med date, user_emailoch actioncolumns som partitioneras av date. Du strömmar ut från user_eventstable och du måste ta bort data från den på grund av GDPR.

När du tar bort vid partition gränser (dvs. WHERE finns på en partitioncolumn) segmenteras filerna redan efter värde, så borttagningen släpper bara filerna från metadata. När du tar bort en hel partition med data kan du använda följande:

spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

Om du tar bort data i flera partitioner (i det här exemplet filtrering på user_email) använder du följande syntax:

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

Om du update en user_email med instruktionen UPDATE, skrivs filen som innehåller user_email om. Använd skipChangeCommits för att ignorera de ändrade datafilerna.

Ange inledande position

Du kan använda följande alternativ för att ange startpunkten för Delta Lake-strömningskällan utan att bearbeta hela table.

  • startingVersion: Delta Lake-versionen att börja från. Databricks rekommenderar att du utelämnar det här alternativet för de flesta arbetsbelastningar. När inte setstartar strömmen från den senaste tillgängliga versionen, inklusive en fullständig ögonblicksbild av table just då.

    Om det anges läser strömmen alla ändringar i Delta-table från och med den angivna versionen (inklusive). Om den angivna versionen inte längre är tillgänglig startar inte strömmen. Du kan hämta incheckningsversionerna från versioncolumn av DESCRIBE HISTORY kommandoutdata.

    Om du bara vill returnera de senaste ändringarna anger du latest.

  • startingTimestamp: Tidsstämpeln som ska startas från. Alla table ändringar som har gjorts vid eller efter tidsstämpeln (inklusive) läss av den strömmande läsaren. Om den angivna tidsstämpeln föregår alla table-kommandon börjar strömningsläsningen med den tidigast tillgängliga tidsstämpeln. En av:

    • En tidsstämpelsträng. Exempel: "2019-01-01T00:00:00.000Z"
    • En datumsträng. Exempel: "2019-01-01"

Du kan inte set båda alternativen samtidigt. De börjar gälla endast när du startar en ny direktuppspelningsfråga. Om en direktuppspelningsfråga har startats och förloppet har registrerats i kontrollpunkten ignoreras dessa alternativ.

Viktigt!

Även om du kan starta strömningskällan från en angiven version eller tidsstämpel är schema för strömningskällan alltid den senaste schema från Delta table. Du måste se till att det inte förekommer några inkompatibla schema-ändringar i table Delta efter den angivna versionen eller tidsstämpeln. Annars kan strömningskällan returnera felaktiga resultat när du läser data med en felaktig schema.

Exempel

Anta till exempel att du har en tableuser_events. Om du vill läsa ändringar sedan version 5 använder du:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

Om du vill läsa ändringar sedan 2018-10-18 använder du:

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

Bearbeta den första ögonblicksbilden utan att data tas bort

Kommentar

Den här funktionen är tillgänglig på Databricks Runtime 11.3 LTS och senare. Den här funktionen finns som allmänt tillgänglig förhandsversion.

När du använder en Delta-table som en dataströmkälla bearbetar frågan först alla data som finns i table. Delta-table i den här versionen kallas den första ögonblicksbilden. Som standard bearbetas Delta tabledatafiler baserat på vilken fil som senast ändrades. Den senaste ändringstiden representerar dock inte nödvändigtvis posthändelsens tidsordning.

I en tillståndsberoende strömningsfråga med en definierad watermarkkan bearbetning av filer efter ändringstidpunkt leda till att poster bearbetas i fel ordning. Detta kan leda till att poster släpps som sena händelser av watermark.

Du kan undvika problemet med dataminskningen genom att aktivera följande alternativ:

  • withEventTimeOrder: Om den första ögonblicksbilden ska bearbetas med händelsetidsordning.

När händelsetidsordningen är aktiverad delas händelsetidsintervallet för inledande ögonblicksbildsdata in i tids bucketar. Varje mikrobatch bearbetar en bucket genom att filtrera data inom tidsintervallet. Konfigurationsalternativen maxFilesPerTrigger och maxBytesPerTrigger är fortfarande tillämpliga för att styra mikrobatchstorleken, men endast på ett ungefärligt sätt på grund av bearbetningens natur.

Bilden nedan visar den här processen:

Första ögonblicksbild

Viktig information om den här funktionen:

  • Problemet med dataminskning sker bara när den första Delta-ögonblicksbilden av en tillståndskänslig direktuppspelningsfråga bearbetas i standardordningen.
  • Du kan inte ändra withEventTimeOrder när dataströmfrågan har startats medan den första ögonblicksbilden fortfarande bearbetas. Om du vill starta om med withEventTimeOrder ändrat måste du ta bort kontrollpunkten.
  • Om du kör en dataströmfråga med medEventTimeOrder aktiverat kan du inte nedgradera den till en DBR-version som inte stöder den här funktionen förrän den första bearbetningen av ögonblicksbilder har slutförts. Om du behöver nedgradera kan du vänta tills den första ögonblicksbilden har slutförts eller ta bort kontrollpunkten och starta om frågan.
  • Den här funktionen stöds inte i följande ovanliga scenarier:
    • Händelsetiden column är en genererad column och det finns icke-projektionstransformationer mellan Delta-källan och watermark.
    • Det finns en watermark som har mer än en Delta-källa i dataströmfrågan.
  • När händelsetidsordningen är aktiverad kan prestandan för den inledande bearbetningen av deltaögonblicksbilder vara långsammare.
  • Varje mikrobatch genomsöker den första ögonblicksbilden för att filtrera data inom motsvarande händelsetidsintervall. För snabbare filteråtgärd rekommenderar vi att du använder en Delta-källa column som händelsetid så att datahoppning kan tillämpas (kontrollera Datahopp för Delta Lake för när det är tillämpligt). Dessutom kan table partitionering längs händelsetiden column påskynda bearbetningen ytterligare. Du kan kontrollera Spark-användargränssnittet för att se hur många deltafiler som genomsöks efter en specifik mikrobatch.

Exempel

Anta att du har en tableuser_events med en event_timecolumn. Din direktuppspelningsfråga är en aggregeringsfråga. Om du vill se till att inga data släpps under den första bearbetningen av ögonblicksbilder kan du använda:

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

Kommentar

Du kan också aktivera detta med Spark-konfiguration i klustret som gäller för alla strömmande frågor: spark.databricks.delta.withEventTimeOrder.enabled true

Delta table som uppsamlare

Du kan också skriva data till en Delta-table med structured streaming. Transaktionsloggen gör det möjligt för Delta Lake att garantera bearbetning exakt en gång, även om det finns andra strömmar eller batchfrågor som körs samtidigt mot table.

Kommentar

Funktionen Delta Lake VACUUM tar bort alla filer som inte hanteras av Delta Lake, men hoppar över alla kataloger som börjar med _. Du kan lagra kontrollpunkter på ett säkert sätt tillsammans med andra data och metadata för en Delta-table med hjälp av en katalogstruktur som <table-name>/_checkpoints.

Mått

Du kan ta reda på antalet byte och antalet filer som ännu inte har bearbetats i en strömmande frågeprocess som numBytesOutstanding mått och numFilesOutstanding . Ytterligare mått är:

  • numNewListedFiles: Antal Delta Lake-filer som listades för att beräkna kvarvarande uppgifter för den här batchen.
    • backlogEndOffset: Den table version som används för att beräkna backloggen.

Om du kör dataströmmen i en notebook-fil kan du se dessa mått under fliken Rådata på instrumentpanelen för strömningsfrågans förlopp:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Tilläggsläge

Som standard körs strömmar i tilläggsläge, vilket lägger till nya poster i table.

Använd metoden toTable när du strömmar till tables, som i följande exempel:

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Fullständigt läge

Du kan också använda Structured Streaming för att ersätta hela table med varje batch. Ett exempel på användningsfall är att beräkna en sammanfattning med aggregering:

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

I föregående exempel uppdateras kontinuerligt en table som innehåller det aggregerade antalet händelser per kund.

För program med mer överseende svarstidskrav kan du spara beräkningsresurser med engångsutlösare. Använd dessa för att update sammanfattningsaggregering tables enligt ett visst schema och bearbeta endast nya data som har anlänt sedan den senaste update.

Upsert från strömmande frågor med hjälp av foreachBatch

Du kan använda en kombination av merge och foreachBatch för att skriva komplexa upserts från en strömmande fråga till en Delta-table. Se Använda foreachBatch för att skriva till godtyckliga datamottagare.

Det här mönstret har många program, inklusive följande:

  • Skriv strömningsaggregeringar i läge Update: Detta är mycket effektivare än Komplett läge.
  • Skriva en dataström med databasändringar till en Delta-table: Den sammanslagningsfrågan för att skriva ändringsdata kan användas i foreachBatch för att kontinuerligt tillämpa en ström av ändringar på en Delta-table.
  • Skriv en dataström till Delta table med deduplicering: Den insert- endast sammanslagningsfrågan för deduplicering kan användas i foreachBatch för att kontinuerligt skriva data (med dubbletter) till en Delta-table med automatisk deduplicering.

Kommentar

  • Kontrollera att din merge instruktion inuti foreachBatch är idempotent eftersom omstarter av strömningsfrågan kan tillämpa åtgärden på samma databatch flera gånger.
  • När merge används i foreachBatchkan indatahastigheten för strömningsfrågan (rapporteras via StreamingQueryProgress och visas i diagrammet för notebook-hastighet) rapporteras som en multipel av den faktiska hastighet med vilken data genereras vid källan. Detta beror på att merge läser indata flera gånger vilket gör att indatamåtten multipliceras. Om det här är en flaskhals kan du cachelagra batchens dataram före merge och sedan ta bort cachelagring efter merge.

I följande exempel visas hur du kan använda SQL inom foreachBatch för att utföra den här uppgiften:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Du kan också välja att använda Delta Lake-API:erna för att utföra strömmande upserts, som i följande exempel:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Idempotent table skriver i foreachBatch

Kommentar

Databricks rekommenderar att du konfigurerar en separat direktuppspelningsskrivning för varje mottagare som du vill update. Om du använder foreachBatch för att skriva till flera tables, serialiseras skrivningarna, vilket minskar parallellisering och ökar den totala latenstiden.

Delta tables stöder följande DataFrameWriter alternativ för att göra skrivningar till flera tables inom foreachBatch idempotenta:

  • txnAppId: En unik sträng som du kan skicka på varje DataFrame-skrivning. Du kan till exempel använda StreamingQuery-ID:t som txnAppId.
  • txnVersion: Ett monotont ökande tal som fungerar som transaktionsversion.

Delta Lake använder kombinationen av txnAppId och txnVersion för att identifiera duplicerade skrivningar och ignorera dem.

Om en batchskrivning avbryts med ett fel använder omkörning av batchen samma program och batch-ID för att hjälpa körningen att identifiera duplicerade skrivningar korrekt och ignorera dem. Program-ID (txnAppId) kan vara en användargenererad unik sträng och behöver inte vara relaterad till ström-ID:t. Se Använda foreachBatch för att skriva till godtyckliga datamottagare.

Varning

Om du tar bort kontrollpunkten för direktuppspelning och startar om frågan med en ny kontrollpunkt måste du ange en annan txnAppId. Nya kontrollpunkter börjar med ett batch-ID för 0. Delta Lake använder batch-ID och txnAppId som en unik nyckel och hoppar över batchar med redan sett values.

Följande kodexempel visar det här mönstret:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}