Delen via


Watermerken toepassen om drempelwaarden voor gegevensverwerking te beheren

In dit artikel worden de basisconcepten van watermerken geïntroduceerd en worden aanbevelingen geboden voor het gebruik van watermerken in algemene stateful streamingbewerkingen. U moet watermerken toepassen op stateful streamingbewerkingen om te voorkomen dat de hoeveelheid gegevens die in de status worden bewaard oneindig wordt uitgebreid, waardoor geheugenproblemen kunnen optreden en de verwerkingslatenties tijdens langdurige streamingbewerkingen kunnen toenemen.

Wat is een watermerk?

Structured Streaming maakt gebruik van watermerken om de drempelwaarde te bepalen voor hoe lang updates voor een bepaalde statusentiteit moeten worden verwerkt. Veelvoorkomende voorbeelden van statusentiteiten zijn:

  • Aggregaties in een tijdvenster.
  • Unieke sleutels in een join tussen twee streams.

Wanneer u een watermerk declareert, geeft u een tijdstempelveld en een grenswaarde op voor een streaming DataFrame. Wanneer er nieuwe gegevens binnenkomen, houdt de statusbeheerder de meest recente tijdstempel in het opgegeven veld bij en verwerkt alle records binnen de drempelwaarde voor late tijd.

In het volgende voorbeeld wordt een drempelwaarde van 10 minuten toegepast op een venstertelling:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

In dit voorbeeld:

  • De kolom event_time wordt gebruikt om een watermerk van 10 minuten en een tumblingvenster van 5 minuten te definiëren.
  • Er wordt een telling verzameld voor elke id waargenomen voor elke niet-overlappende vensters van 5 minuten.
  • Statusinformatie wordt bijgehouden voor elke telling totdat het einde van het venster 10 minuten ouder is dan de meest recente waargenomen event_time.

Belangrijk

Drempelwaarden voor watermerken garanderen dat records die binnen de opgegeven drempelwaarde binnenkomen, worden verwerkt volgens de semantiek van de gedefinieerde query. Late binnenkomende records die buiten de opgegeven drempelwaarde binnenkomen, kunnen nog steeds worden verwerkt met behulp van metrische querygegevens, maar dit is niet gegarandeerd.

Hoe beïnvloeden watermerken de verwerkingstijd en doorvoer?

Watermerken communiceren met uitvoermodi om te bepalen wanneer gegevens naar de sink worden geschreven. Omdat watermerken de totale hoeveelheid statusgegevens verminderen die moeten worden verwerkt, is effectief gebruik van watermerken essentieel voor efficiënte stateful streamingdoorvoer.

Notitie

Niet alle uitvoermodi worden ondersteund voor alle stateful bewerkingen.

Watermerken en uitvoermodus voor gevensterde aggregaties

In de volgende tabel wordt de verwerking van query's met aggregatie op een tijdstempel met een watermerk gedefinieerd:

Uitvoermodus Gedrag
Toevoegen Rijen worden naar de doeltabel geschreven zodra de drempelwaarde voor het watermerk is verstreken. Alle schrijfbewerkingen worden vertraagd op basis van de drempelwaarde voor late tijd. De oude aggregatiestatus wordt verwijderd zodra de drempelwaarde is verstreken.
Bijwerken Rijen worden naar de doeltabel geschreven zodra de resultaten zijn berekend en kunnen worden bijgewerkt en overschreven zodra er nieuwe gegevens zijn. De oude aggregatiestatus wordt verwijderd zodra de drempelwaarde is verstreken.
Voltooid De aggregatiestatus wordt niet verwijderd. De doeltabel wordt herschreven met elke trigger.

Watermerken en uitvoer voor stream-stream-joins

Joins tussen meerdere streams ondersteunen alleen de toevoegmodus en overeenkomende records worden geschreven in elke batch die ze worden gedetecteerd. Voor inner joins raadt Databricks aan om een watermerkdrempel in te stellen voor elke streaminggegevensbron. Hierdoor kunnen statusgegevens worden verwijderd voor oude records. Zonder watermerken probeert Structured Streaming elke sleutel van beide zijden van de join samen te voegen met elke trigger.

Structured Streaming heeft speciale semantiek ter ondersteuning van outer joins. Watermerken zijn verplicht voor outer joins, zoals wordt aangegeven wanneer een sleutel moet worden geschreven met een null-waarde nadat deze niet overeenkomt. Hoewel outer joins nuttig kunnen zijn voor het opnemen van records die nooit overeenkomen tijdens gegevensverwerking, worden deze ontbrekende gegevens pas geregistreerd nadat de drempelwaarde voor late verwerkingstijd is verstreken, omdat joins alleen naar tabellen schrijven als toevoegingen.

De drempelwaarde voor late gegevens beheren met beleid voor meerdere watermerken in Gestructureerd streamen

Wanneer u met meerdere Structured Streaming-invoer werkt, kunt u meerdere watermerken instellen om tolerantiedrempels te beheren voor gegevens die te laat binnenkomen. Door watermerken te configureren, kunt u statusgegevens beheren en latentie beïnvloeden.

Een streamingquery kan meerdere invoerstromen hebben die zijn samengevoegd of samengevoegd. Elk van de invoerstromen kan een andere drempelwaarde hebben voor late gegevens die moeten worden getolereerd voor stateful bewerkingen. Geef deze drempelwaarden op met behulp withWatermarks("eventTime", delay) van elk van de invoerstromen. Hier volgt een voorbeeldquery met stream-stream joins.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Tijdens het uitvoeren van de query houdt Structured Streaming afzonderlijk de maximale gebeurtenistijd bij die in elke invoerstroom wordt waargenomen, berekent het de watermerken op basis van de bijbehorende vertraging en kiest het een enkel globaal watermerk dat voor stateful bewerkingen wordt gebruikt. Standaard wordt het minimum als globaal watermerk gekozen, omdat er geen gegevens per ongeluk als te laat worden verwijderd als een van de stromen achter de andere stromen valt (bijvoorbeeld een van de streams stopt met het ontvangen van gegevens vanwege upstreamfouten). Met andere woorden, het globale watermerk beweegt op een veilige manier in het tempo van de traagste stroom en de query-uitvoer wordt dienovereenkomstig vertraagd.

Als u snellere resultaten wilt krijgen, kunt u het beleid voor meerdere watermerken instellen om de maximumwaarde als globaal watermerk te kiezen door de SQL-configuratie in te stellen spark.sql.streaming.multipleWatermarkPolicy op max (standaard is min). Hierdoor kan het wereldwijde watermerk in het tempo van de snelste stroom worden verplaatst. Deze configuratie verwijdert echter gegevens uit de langzaamste stromen. Daarom raadt Databricks u aan deze configuratie zorgvuldig te gebruiken.

dubbele waarden binnen het watermerk verwijderen

In Databricks Runtime 13.3 LTS en hoger kunt u met behulp van een unieke ID records binnen een watermarkgrens ontdubbelen.

Structured Streaming biedt exactly-once verwerkingsgaranties, maar ontdubbelt niet automatisch records uit gegevensbronnen. U kunt dropDuplicatesWithinWatermark gebruiken om records op elk opgegeven veld te ontdubbelen, zodat u duplicaten uit een stroom kunt verwijderen, zelfs als sommige velden verschillen (zoals gebeurtenistijd of aankomsttijd).

Dubbele records die binnen het opgegeven watermerk binnenkomen, worden gegarandeerd verwijderd. Deze garantie is strikt in slechts één richting en dubbele records die buiten de opgegeven drempelwaarde aankomen, kunnen ook worden verwijderd. U moet de vertragingsdrempel van watermerk langer instellen dan de maximale tijdstempelverschillen tussen dubbele gebeurtenissen om alle duplicaten te verwijderen.

U moet een watermerk opgeven om de methode dropDuplicatesWithinWatermark te gebruiken, zoals in het volgende voorbeeld:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])