Delen via


Stateful verwerking optimaliseren in Delta Live Tables met watermerken

Gebruik watermerken bij het uitvoeren van stateful stroomverwerking in Delta Live Tables, waaronder aggregaties, joins en ontdubbeling om de gegevens effectief in staat te beheren. In dit artikel wordt beschreven hoe u watermerken gebruikt in uw Delta Live Tables-query's en voorbeelden bevat van de aanbevolen bewerkingen.

Notitie

Om ervoor te zorgen dat query's die aggregaties uitvoeren incrementeel worden verwerkt en niet volledig opnieuw worden aangevuld met elke update, moet u watermerken gebruiken.

Wat is een watermerk?

In streamverwerking is een watermerk een functie van Apache Spark die een tijdgebaseerde drempelwaarde kan definiëren voor het verwerken van gegevens bij het uitvoeren van stateful operaties, zoals aggregaties. Gegevens die binnenkomen, worden verwerkt totdat de drempelwaarde is bereikt, waarna het tijdvenster dat door de drempelwaarde is gedefinieerd, wordt gesloten. Watermerken kunnen worden gebruikt om problemen tijdens het verwerken van query's te voorkomen, voornamelijk bij het verwerken van grotere gegevenssets of langdurige verwerking. Deze problemen kunnen hoge latentie omvatten bij het produceren van resultaten en zelfs OOM-fouten (out-of-memory) vanwege de hoeveelheid gegevens die tijdens de verwerking in de status wordt bewaard. Omdat streaminggegevens inherent ongeordeld zijn, bieden watermerken ook ondersteuning voor het correct berekenen van bewerkingen zoals tijdvensteraggregaties.

Zie Watermerken in Apache Spark Structured Streaming en Watermerken toepassen om drempelwaarden voor gegevensverwerking te beheren voor meer informatie over het gebruik van watermerken in stroomverwerking.

Hoe definieert u een watermerk?

U definieert een watermerk door een tijdstempelveld en een waarde op te geven die de tijdsdrempel aangeeft voor late gegevens die binnenkomen. Gegevens worden te laat beschouwd als ze na de gedefinieerde tijdsdrempel binnenkomen. Als de drempelwaarde bijvoorbeeld is gedefinieerd als 10 minuten, kunnen records die na de drempelwaarde van 10 minuten binnenkomen, worden verwijderd.

Omdat records die na de gedefinieerde drempelwaarde binnenkomen, mogelijk worden verwijderd, is het selecteren van een drempelwaarde die voldoet aan uw latentie versus de vereisten voor juistheid belangrijk. Als u een kleinere drempelwaarde kiest, worden records sneller verzonden, maar betekent dit ook dat latere records waarschijnlijker worden verwijderd. Een grotere drempelwaarde betekent een langere wachttijd, maar mogelijk meer volledigheid van gegevens. Vanwege de grotere statusgrootte kan een grotere drempelwaarde ook extra rekenresources vereisen. Omdat de drempelwaarde afhankelijk is van uw vereisten voor gegevens en verwerking, is testen en bewaken van uw verwerking belangrijk om een optimale drempelwaarde te bepalen.

U gebruikt de functie withWatermark() in Python om een watermerk te definiëren. Gebruik in SQL de WATERMARK-component om een watermerk te definiëren:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Watermerken met stream-stream-joins gebruiken

Voor stream-stream joins moet u een watermerk aan beide zijden van de join en een tijdsintervalcomponent definiëren. Omdat elke joinbron een onvolledige weergave van de gegevens heeft, is de tijdsintervalcomponent vereist om de streaming-engine te laten weten wanneer er geen verdere overeenkomsten kunnen worden gemaakt. De tijdsintervalcomponent moet dezelfde velden gebruiken die worden gebruikt om de watermerken te definiëren.

Omdat voor elke stroom mogelijk verschillende drempelwaarden voor watermerken zijn vereist, hoeven de streams niet dezelfde drempelwaarden te hebben. Om ontbrekende gegevens te voorkomen, onderhoudt de streaming-engine één globaal watermerk op basis van de langzaamste stroom.

In het volgende voorbeeld wordt een stream met advertentie-indrukken samengevoegd en wordt er een stroom gebruikersklikken op advertenties weergegeven. In dit voorbeeld moet er binnen 3 minuten na de indruk een klik plaatsvinden. Nadat het tijdsinterval van 3 minuten is verstreken, worden rijen uit de status verwijderd die niet meer overeenkomen.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Vensteraggregaties uitvoeren met watermerken

Een algemene stateful bewerking voor streaminggegevens is een gevensterde aggregatie. Gevensterde aggregaties zijn vergelijkbaar met gegroepeerde aggregaties, behalve dat geaggregeerde waarden worden geretourneerd voor de set rijen die deel uitmaken van het gedefinieerde venster.

Een venster kan worden gedefinieerd als een bepaalde lengte en een aggregatiebewerking kan worden uitgevoerd op alle rijen die deel uitmaken van dat venster. Spark Streaming ondersteunt drie typen vensters:

  • Tumblingvensters (vaste) vensters: een reeks vaste, niet-overlappende en aaneengesloten tijdsintervallen. Een invoerrecord behoort tot slechts één venster.
  • Schuifvensters: Net als bij tumblingvensters zijn schuifvensters vaste grootte, maar vensters kunnen overlappen en kan een record in meerdere vensters vallen.

Wanneer gegevens voorbij het einde van het venster binnenkomen plus de lengte van het watermerk, worden er geen nieuwe gegevens geaccepteerd voor het venster, wordt het resultaat van de aggregatie verzonden en wordt de status voor het venster verwijderd.

In het volgende voorbeeld wordt elke 5 minuten een som van de weergaven berekend met behulp van een vast venster. In dit voorbeeld gebruikt de select-component de alias impressions_windowen wordt het venster zelf gedefinieerd als onderdeel van de GROUP BY-component. Het venster moet zijn gebaseerd op dezelfde tijdstempelkolom als het watermerk, de clickTimestamp kolom in dit voorbeeld.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Een vergelijkbaar voorbeeld in Python om de winst te berekenen in meer dan uur vaste vensters:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Streamingrecords ontdubbelen

Structured Streaming heeft precies eenmaal verwerkingsgaranties, maar ontdubbelt niet automatisch records uit gegevensbronnen. Omdat veel berichtenwachtrijen bijvoorbeeld ten minste eenmaal garanties hebben, moeten dubbele records worden verwacht bij het lezen van een van deze berichtenwachtrijen. U kunt de dropDuplicatesWithinWatermark() functie gebruiken om records op elk opgegeven veld te dedupliceren, zodat duplicaten uit een stroom worden verwijderd, zelfs als sommige velden verschillen (zoals gebeurtenistijd of aankomsttijd). U moet een watermerk opgeven om de functie dropDuplicatesWithinWatermark() te gebruiken. Alle dubbele gegevens die binnen het door het watermerk opgegeven tijdsbereik binnenkomen, worden verwijderd.

Geordende gegevens zijn belangrijk omdat out-of-order gegevens ervoor zorgen dat de watermerkwaarde verkeerd omhoog springt. Wanneer oudere gegevens binnenkomen, wordt dit beschouwd als laat en verwijderd. Gebruik de optie withEventTimeOrder om de eerste momentopname op volgorde te verwerken op basis van de tijdstempel die is opgegeven in het watermerk. De withEventTimeOrder optie kan worden gedeclareerd in de code die de gegevensset definieert of in de pijplijninstellingen met behulp van spark.databricks.delta.withEventTimeOrder.enabled. Bijvoorbeeld:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Notitie

De withEventTimeOrder optie wordt alleen ondersteund met Python.

In het volgende voorbeeld worden gegevens op volgorde van clickTimestampverwerkt en worden records die binnen 5 seconden na elkaar binnenkomen die dubbele userId bevatten en clickAdId kolommen worden verwijderd.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Pijplijnconfiguratie optimaliseren voor stateful verwerking

Om productieproblemen en overmatige latentie te voorkomen, raadt Databricks aan om statusbeheer op basis van RocksDB in te schakelen voor uw stateful stroomverwerking, met name als uw verwerking een grote hoeveelheid tussenliggende status vereist.

Serverloze pijplijnen beheren configuraties voor statusopslag automatisch.

U kunt statusbeheer op basis van RocksDB inschakelen door de volgende configuratie in te stellen voordat u een pijplijn implementeert:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Zie RocksDB-statusopslag configureren in Azure Databricks voor meer informatie over het statusarchief RocksDB, inclusief configuratieaanaanvelingen voor RocksDB.