Optimera tillståndskänslig bearbetning i Delta Live Tables med vattenstämplar
Använd vattenstämplar när du utför tillståndskänslig dataströmbearbetning i Delta Live Tables, inklusive aggregeringar, kopplingar och deduplicering för att effektivt hantera data som lagras i tillståndet. Den här artikeln beskriver hur du använder vattenstämplar i dina Delta Live Tables-frågor och innehåller exempel på de rekommenderade åtgärderna.
Kommentar
För att säkerställa att frågor som utför aggregeringar bearbetas inkrementellt och inte helt omberäknas med varje uppdatering måste du använda vattenstämplar.
Vad är en vattenstämpel?
Vid dataströmbearbetning är en vattenstämpel en Apache Spark-funktion som kan definiera ett tidsbaserat tröskelvärde för bearbetning av data när tillståndskänsliga åtgärder som aggregeringar utförs. Data som anländer bearbetas tills tröskelvärdet har nåtts, då tidsfönstret som definieras av tröskelvärdet stängs. Vattenstämplar kan användas för att undvika problem vid frågebearbetning, främst vid bearbetning av större datamängder eller långvarig bearbetning. Dessa problem kan omfatta hög svarstid vid skapande av resultat och till och med OOM-fel (out-of-memory) på grund av mängden data som lagras i tillståndet under bearbetningen. Eftersom strömmande data i sig är osorterade stöder vattenstämplar också korrekt beräkning av åtgärder som aggregeringar i tidsfönster.
Mer information om hur du använder vattenstämplar i strömbearbetning finns i Vattenstämpling i Apache Spark Structured Streaming och Tillämpa vattenstämplar för att kontrollera tröskelvärden för databearbetning.
Hur definierar du en vattenstämpel?
Du definierar en vattenstämpel genom att ange ett tidsstämpelfält och ett värde som representerar tidströskeln för att sena data ska tas emot. Data anses vara sena om de tas emot efter det definierade tidströskelvärdet. Om tröskelvärdet till exempel definieras som 10 minuter kan poster som anländer efter tröskelvärdet på 10 minuter tas bort.
Eftersom poster som tas emot efter det definierade tröskelvärdet kan tas bort är det viktigt att välja ett tröskelvärde som uppfyller kraven på svarstid kontra korrekthet. Om du väljer ett mindre tröskelvärde genereras poster tidigare, men det innebär också att sena poster är mer benägna att tas bort. Ett större tröskelvärde innebär en längre väntetid men eventuellt mer fullständighet av data. På grund av den större tillståndsstorleken kan ett större tröskelvärde också kräva ytterligare beräkningsresurser. Eftersom tröskelvärdet beror på dina data- och bearbetningskrav är det viktigt att testa och övervaka bearbetningen för att fastställa ett optimalt tröskelvärde.
Du använder withWatermark()
funktionen i Python för att definiera en vattenstämpel. I SQL använder du WATERMARK
-satsen för att definiera en vattenstämpel:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Använda vattenstämplar med strömströmskopplingar
För strömströmskopplingar måste du definiera en vattenstämpel på båda sidor av kopplingen och en tidsintervallsats. Eftersom varje kopplingskälla har en ofullständig vy över data krävs tidsintervallsatsen för att meddela strömningsmotorn när inga ytterligare matchningar kan göras. Tidsintervallsatsen måste använda samma fält som används för att definiera vattenstämplarna.
Eftersom det kan finnas tillfällen då varje ström kräver olika tröskelvärden för vattenstämplar behöver strömmarna inte ha samma tröskelvärden. För att undvika data som saknas behåller strömningsmotorn en global vattenstämpel baserat på den långsammaste strömmen.
I följande exempel kopplas en ström av annonsvisningar och en ström av användare klickar på annonser. I det här exemplet måste ett klick ske inom 3 minuter efter exponeringen. När tidsintervallet på 3 minuter har passerat tas rader från tillståndet som inte längre kan matchas bort.
Python
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(LIVE.bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Utföra fönsteraggregeringar med vattenstämplar
En vanlig tillståndskänslig åtgärd för strömmande data är en fönsterad aggregering. Fönsteraggregeringar liknar grupperade aggregeringar, förutom att aggregeringsvärden returneras för den uppsättning rader som ingår i det definierade fönstret.
Ett fönster kan definieras som en viss längd och en aggregeringsåtgärd kan utföras på alla rader som ingår i fönstret. Spark Streaming stöder tre typer av fönster:
- Rullande (fasta) fönster: En serie tidsintervall med fast storlek, icke-överlappande och sammanhängande. En indatapost tillhör endast ett enda fönster.
- Skjutfönster: På samma sätt som rullande fönster är skjutfönster i fast storlek, men fönster kan överlappa varandra och en post kan falla i flera fönster.
När data kommer förbi slutet av fönstret plus vattenstämpelns längd godkänns inga nya data för fönstret, resultatet av aggregeringen genereras och tillståndet för fönstret tas bort.
I följande exempel beräknas en summa av visningar var 5:e minut med ett fast fönster. I det här exemplet använder select-satsen aliaset impressions_window
, och sedan definieras själva fönstret som en del av GROUP BY
-satsen. Fönstret måste baseras på samma tidsstämpelkolumn som vattenstämpeln clickTimestamp
, kolumnen i det här exemplet.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(LIVE.silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Ett liknande exempel i Python för att beräkna vinst över timsbegränsade fönster:
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Deduplicera strömmande poster
Strukturerad direktuppspelning har bearbetningsgarantier exakt en gång, men avduplicerar inte automatiskt poster från datakällor. Eftersom många meddelandeköer till exempel har minst en gång garantier bör dubbletter av poster förväntas när du läser från en av dessa meddelandeköer. Du kan använda dropDuplicatesWithinWatermark()
funktionen för att av duplicera poster i ett angivet fält och ta bort dubbletter från en dataström även om vissa fält skiljer sig åt (till exempel händelsetid eller ankomsttid). Du måste ange en vattenstämpel för att använda dropDuplicatesWithinWatermark()
funktionen. Alla duplicerade data som tas emot inom det tidsintervall som anges av vattenstämpeln tas bort.
Ordnade data är viktiga eftersom data som inte är i ordning gör att vattenstämpelvärdet går fel. När äldre data sedan kommer anses de vara sena och borttagna. Använd alternativet withEventTimeOrder
för att bearbeta den första ögonblicksbilden i ordning baserat på tidsstämpeln som anges i vattenstämpeln. Alternativet withEventTimeOrder
kan deklareras i koden som definierar datauppsättningen eller i pipelineinställningarna med hjälp av spark.databricks.delta.withEventTimeOrder.enabled
. Till exempel:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Kommentar
Alternativet withEventTimeOrder
stöds endast med Python.
I följande exempel bearbetas data efter clickTimestamp
, och poster som kommer inom 5 sekunder från varandra som innehåller dubbletter userId
och clickAdId
kolumner tas bort.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("LIVE.rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Optimera pipelinekonfigurationen för tillståndskänslig bearbetning
För att förhindra produktionsproblem och långa svarstider rekommenderar Databricks att du aktiverar RocksDB-baserad tillståndshantering för tillståndskänslig dataströmbearbetning, särskilt om bearbetningen kräver att du sparar en stor mängd mellanliggande tillstånd.
Severless-pipelines hanterar automatiskt konfigurationer för tillståndslager.
Du kan aktivera RocksDB-baserad tillståndshantering genom att ange följande konfiguration innan du distribuerar en pipeline:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Mer information om RocksDB-tillståndslagret, inklusive konfigurationsrekommendationer för RocksDB, finns i Konfigurera RocksDB-tillståndslager på Azure Databricks.