Stateful verwerking optimaliseren in Delta Live Tables met watermerken
Gebruik watermerken bij het uitvoeren van stateful stroomverwerking in Delta Live Tables, waaronder aggregaties, joins en ontdubbeling om de gegevens effectief in staat te beheren. In dit artikel wordt beschreven hoe u watermerken gebruikt in uw Delta Live Tables-query's en voorbeelden bevat van de aanbevolen bewerkingen.
Notitie
Om ervoor te zorgen dat query's die aggregaties uitvoeren incrementeel worden verwerkt en niet volledig opnieuw worden aangevuld met elke update, moet u watermerken gebruiken.
Wat is een watermerk?
Bij stroomverwerking is een watermerk een Apache Spark-functie die een drempelwaarde op basis van tijd kan definiëren voor het verwerken van gegevens bij het uitvoeren van stateful bewerkingen zoals aggregaties. Gegevens die binnenkomen, worden verwerkt totdat de drempelwaarde is bereikt, waarna het tijdvenster dat door de drempelwaarde is gedefinieerd, wordt gesloten. Watermerken kunnen worden gebruikt om problemen tijdens het verwerken van query's te voorkomen, voornamelijk bij het verwerken van grotere gegevenssets of langdurige verwerking. Deze problemen kunnen hoge latentie omvatten bij het produceren van resultaten en zelfs OOM-fouten (out-of-memory) vanwege de hoeveelheid gegevens die tijdens de verwerking in de status wordt bewaard. Omdat streaminggegevens inherent ongeordeld zijn, bieden watermerken ook ondersteuning voor het correct berekenen van bewerkingen zoals tijdvensteraggregaties.
Zie Watermerken in Apache Spark Structured Streaming en Watermerken toepassen om drempelwaarden voor gegevensverwerking te beheren voor meer informatie over het gebruik van watermerken in stroomverwerking.
Hoe definieert u een watermerk?
U definieert een watermerk door een tijdstempelveld en een waarde op te geven die de tijdsdrempel aangeeft voor late gegevens die moeten binnenkomen. Gegevens worden te laat beschouwd als ze na de gedefinieerde tijdsdrempel binnenkomen. Als de drempelwaarde bijvoorbeeld is gedefinieerd als 10 minuten, kunnen records die na de drempelwaarde van 10 minuten binnenkomen, worden verwijderd.
Omdat records die na de gedefinieerde drempelwaarde binnenkomen, mogelijk worden verwijderd, is het selecteren van een drempelwaarde die voldoet aan uw latentie versus de vereisten voor juistheid belangrijk. Als u een kleinere drempelwaarde kiest, worden records sneller verzonden, maar betekent dit ook dat latere records waarschijnlijker worden verwijderd. Een grotere drempelwaarde betekent een langere wachttijd, maar mogelijk meer volledigheid van gegevens. Vanwege de grotere statusgrootte kan een grotere drempelwaarde ook extra rekenresources vereisen. Omdat de drempelwaarde afhankelijk is van uw vereisten voor gegevens en verwerking, is testen en bewaken van uw verwerking belangrijk om een optimale drempelwaarde te bepalen.
U gebruikt de withWatermark()
functie in Python om een watermerk te definiëren. Gebruik in SQL de WATERMARK
component om een watermerk te definiëren:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Watermerken gebruiken met stream-stream-joins
Voor stream-stream joins moet u een watermerk aan beide zijden van de join en een tijdsintervalcomponent definiëren. Omdat elke joinbron een onvolledige weergave van de gegevens heeft, is de tijdsintervalcomponent vereist om de streaming-engine te laten weten wanneer er geen verdere overeenkomsten kunnen worden gemaakt. De tijdsintervalcomponent moet dezelfde velden gebruiken die worden gebruikt om de watermerken te definiëren.
Omdat voor elke stroom mogelijk verschillende drempelwaarden voor watermerken zijn vereist, hoeven de streams niet dezelfde drempelwaarden te hebben. Om ontbrekende gegevens te voorkomen, onderhoudt de streaming-engine één globaal watermerk op basis van de langzaamste stroom.
In het volgende voorbeeld wordt een stream met advertentie-indrukken samengevoegd en wordt er een stroom gebruikersklikken op advertenties weergegeven. In dit voorbeeld moet er binnen 3 minuten na de indruk een klik plaatsvinden. Nadat het tijdsinterval van 3 minuten is verstreken, worden rijen uit de status verwijderd die niet meer overeenkomen.
Python
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(LIVE.bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Vensteraggregaties uitvoeren met watermerken
Een algemene stateful bewerking voor streaminggegevens is een gevensterde aggregatie. Gevensterde aggregaties zijn vergelijkbaar met gegroepeerde aggregaties, behalve dat geaggregeerde waarden worden geretourneerd voor de set rijen die deel uitmaken van het gedefinieerde venster.
Een venster kan worden gedefinieerd als een bepaalde lengte en een aggregatiebewerking kan worden uitgevoerd op alle rijen die deel uitmaken van dat venster. Spark Streaming ondersteunt drie typen vensters:
- Tumblingvensters (vaste) vensters: een reeks vaste, niet-overlappende en aaneengesloten tijdsintervallen. Een invoerrecord behoort tot slechts één venster.
- Schuifvensters: Net als bij tumblingvensters zijn schuifvensters vaste grootte, maar vensters kunnen overlappen en kan een record in meerdere vensters vallen.
Wanneer gegevens voorbij het einde van het venster binnenkomen plus de lengte van het watermerk, worden er geen nieuwe gegevens geaccepteerd voor het venster, wordt het resultaat van de aggregatie verzonden en wordt de status voor het venster verwijderd.
In het volgende voorbeeld wordt elke 5 minuten een som van de weergaven berekend met behulp van een vast venster. In dit voorbeeld gebruikt de select-component de alias impressions_window
en wordt het venster zelf gedefinieerd als onderdeel van de GROUP BY
component. Het venster moet zijn gebaseerd op dezelfde tijdstempelkolom als het watermerk, de clickTimestamp
kolom in dit voorbeeld.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(LIVE.silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Een vergelijkbaar voorbeeld in Python om de winst te berekenen in meer dan uur vaste vensters:
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Streamingrecords ontdubbelen
Structured Streaming heeft precies eenmaal verwerkingsgaranties, maar ontdubbelt niet automatisch records uit gegevensbronnen. Omdat veel berichtenwachtrijen bijvoorbeeld ten minste eenmaal garanties hebben, moeten dubbele records worden verwacht bij het lezen van een van deze berichtenwachtrijen. U kunt de dropDuplicatesWithinWatermark()
functie gebruiken om records op elk opgegeven veld te dedupliceren, zodat duplicaten uit een stroom worden verwijderd, zelfs als sommige velden verschillen (zoals gebeurtenistijd of aankomsttijd). U moet een watermerk opgeven om de dropDuplicatesWithinWatermark()
functie te kunnen gebruiken. Alle dubbele gegevens die binnen het door het watermerk opgegeven tijdsbereik binnenkomen, worden verwijderd.
Geordende gegevens zijn belangrijk omdat out-of-ordergegevens ervoor zorgen dat de watermerkwaarde onjuist vooruit springt. Wanneer oudere gegevens binnenkomen, wordt dit beschouwd als laat en verwijderd. Gebruik de withEventTimeOrder
optie om de eerste momentopname op volgorde te verwerken op basis van de tijdstempel die is opgegeven in het watermerk. De withEventTimeOrder
optie kan worden gedeclareerd in de code die de gegevensset definieert of in de pijplijninstellingen met behulp van spark.databricks.delta.withEventTimeOrder.enabled
. Bijvoorbeeld:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Notitie
De withEventTimeOrder
optie wordt alleen ondersteund met Python.
In het volgende voorbeeld worden gegevens verwerkt op volgorde clickTimestamp
van en records die binnen 5 seconden van elkaar binnenkomen die dubbele userId
waarden bevatten en clickAdId
kolommen worden verwijderd.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("LIVE.rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Pijplijnconfiguratie optimaliseren voor stateful verwerking
Om productieproblemen en overmatige latentie te voorkomen, raadt Databricks aan om statusbeheer op basis van RocksDB in te schakelen voor uw stateful stroomverwerking, met name als uw verwerking een grote hoeveelheid tussenliggende status vereist.
Serverloze pijplijnen beheren configuraties voor statusopslag automatisch.
U kunt statusbeheer op basis van RocksDB inschakelen door de volgende configuratie in te stellen voordat u een pijplijn implementeert:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Zie RocksDB-statusopslag configureren in Azure Databricks voor meer informatie over het statusarchief RocksDB, inclusief configuratieaanaanvelingen voor RocksDB.