Tillämpa vattenstämplar för att kontrollera tröskelvärden för databehandling
Den här artikeln beskriver de grundläggande begreppen vattenstämpel och ger rekommendationer för att använda vattenstämplar i vanliga tillståndskänsliga strömningsåtgärder. Du måste använda vattenstämplar för tillståndskänsliga strömningsåtgärder för att undvika att oändligt utöka mängden data som lagras i tillståndet, vilket kan medföra minnesproblem och öka bearbetningens svarstider under långvariga strömningsåtgärder.
Vad är en watermark?
Strukturerad direktuppspelning använder vattenstämplar för att kontrollera tröskelvärdet för hur länge uppdateringar för en viss tillståndsentitet ska bearbetas. Vanliga exempel på tillståndsentiteter är:
- Aggregeringar över en tidsperiod window.
- Unika nycklar i en join mellan två strömmar.
När du deklarerar en watermarkanger du ett tidsstämpelfält och ett watermark tröskelvärde för en strömmande DataFrame. När nya data tas emot spårar tillståndshanteraren den senaste tidsstämpeln i det angivna fältet och bearbetar alla poster inom tröskelvärdet för fördröjning.
I följande exempel tillämpas ett tröskelvärde på 10 minuter watermark för ett antal fönster:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
I det här exemplet:
-
event_time
column används för att definiera en watermark på 10 minuter och en 5-minuters tumbling window. - Ett antal samlas in för varje
id
observerad för varje icke-överlappande 5-minutersfönster. - Tillståndsinformation bevaras för varje räkning ända tills window är 10 minuter äldre än den senaste observerade
event_time
.
Viktigt!
Watermark tröskelvärden garanterar att poster som kommer inom det angivna tröskelvärdet bearbetas enligt semantiken i den definierade frågan. För sent angivna poster som kommer utanför det angivna tröskelvärdet kan fortfarande bearbetas med hjälp av frågemått, men detta är inte garanterat.
Hur påverkar vattenstämplar bearbetningstiden och dataflödet?
Vattenstämplar interagerar med utdatalägen för att styra när data skrivs till mottagaren. Eftersom vattenstämplar minskar den totala mängden tillståndsinformation som ska bearbetas är effektiv användning av vattenstämplar avgörande för effektivt tillståndskänsligt strömmande dataflöde.
Kommentar
Alla utdatalägen stöds inte för alla tillståndskänsliga åtgärder.
Vattenstämplar och utdataläge för fönsteraggregeringar
Det följande table beskriver bearbetning av förfrågningar med aggregering på en tidsstämpel där en watermark är definierad.
Utdataläge | Funktionssätt |
---|---|
Lägga till | Rader skrivs till målobjektet table när tröskelvärdet för watermark har passerats. Alla skrivningar fördröjs baserat på tröskelvärdet för fördröjning. Det gamla aggregeringstillståndet tas bort när tröskelvärdet har passerat. |
Update | Rader skrivs till målet table när resultaten beräknas och kan uppdateras och skrivas över när nya data tas emot. Det gamla aggregeringstillståndet tas bort när tröskelvärdet har passerat. |
Klart | Sammansättningstillståndet tas inte bort. Målet table skrivs över med varje trigger. |
Vattenstämplar och utdata för strömströmanslutningar
Kopplingar mellan flera strömmar stöder endast tilläggsläge och matchade poster skrivs i varje batch som de identifieras. För inre kopplingar rekommenderar Databricks att du anger ett watermark tröskelvärde för varje strömmande datakälla. Detta gör att tillståndsinformation kan ignoreras för gamla poster. Utan vattenstämplar försöker Structured Streaming join varje nyckel från båda sidor av join med varje utlösare.
Strukturerad direktuppspelning har särskilda semantik för att stödja yttre kopplingar. Vattenstämpling är obligatoriskt för yttre kopplingar, eftersom det anger när en nyckel måste skrivas med ett null-värde efter att ha gått omatchat. Observera att även om yttre sammanfogningar kan vara användbara för att registrera poster som aldrig matchas under databehandlingen, eftersom sammanfogningar endast skrivs till tables som tilläggsoperationer, registreras inte dessa saknade data förrän fördröjningströskeln har passerat.
Kontrollera tröskelvärde för sena data med flera watermark policys i Structured Streaming
När du arbetar med flera indata för strukturerad direktuppspelning kan du set flera vattenstämplar för att kontrollera toleranströsklar för data som kommer sent. Genom att konfigurera vattenstämplar kan du styra tillståndsinformationen och påverka svarstiden.
En direktuppspelningsfråga kan ha flera indataströmmar som är sammankopplade eller kopplade. Var och en av indataströmmarna kan ha olika tröskelvärden för sena data som måste tolereras för tillståndskänsliga åtgärder. Ange dessa tröskelvärden med var withWatermarks("eventTime", delay)
och en av indataströmmarna. Följande är en exempelfråga med stream-stream-kopplingar.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
När du kör frågan spårar Structured Streaming individuellt den maximala tidstämpeln för händelser som visas i varje indataström, beräknar vattenstämplar baserat på motsvarande fördröjning och väljer en enda global watermark för dessa, för att användas för tillståndskänsliga operationer. Som standard väljs minimivärdet som global watermark eftersom det säkerställer att inga data oavsiktligt tas bort som för sent om en av strömmarna hamnar bakom de andra (till exempel slutar en av strömmarna att ta emot data på grund av överordnad fel). Med andra ord rör sig den globala watermark säkert i den långsammaste strömmens takt och frågeutdata fördröjs i enlighet därmed.
För att uppnå snabbare get resultat kan du använda set principen för flera watermark för att välja till det maximala värdet som global watermark genom att ange SQL-konfigurationen spark.sql.streaming.multipleWatermarkPolicy
till max
(standardvärdet är min
). På så sätt kan den globala watermark röra sig i den snabbaste strömmens takt. Den här konfigurationen släpper dock data från de långsammaste strömmarna. Därför rekommenderar Databricks att du använder den här konfigurationen på ett omdömesgillt sätt.
Släpp dubbletter inom watermark
I Databricks Runtime 13.3 LTS och senare kan du deduplicera poster inom ett watermark tröskelvärde med hjälp av en unik identifier.
Strukturerad direktuppspelning ger bearbetningsgarantier exakt en gång, men deduplicerar inte automatiskt poster från datakällor. Du kan använda dropDuplicatesWithinWatermark
för att deduplicera poster på vilket specifikt fält som helst, vilket gör att du kan remove dubbletter från en dataström även om vissa fält skiljer sig åt (till exempel händelsetid eller ankomsttid).
Dubbletter av poster som anländer inom den angivna watermark kommer garanterat att raderas. Den här garantin är strikt i endast en riktning, och dubbletter av poster som kommer utanför det angivna tröskelvärdet kan också tas bort. Du måste set tröskelvärdet för fördröjning på watermark längre än maximala tidsstämpelskillnader mellan duplicerade händelser för att remove alla dubbletter.
Du måste ange en watermark för att använda metoden dropDuplicatesWithinWatermark
, som i följande exempel:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])