Dela via


Optimera tillståndskänslig bearbetning i Delta Live Tables med vattenstämplar

Använd vattenstämplar när du utför tillståndskänslig dataströmbearbetning i Delta Live Tables, inklusive aggregeringar, kopplingar och deduplicering för att effektivt hantera data som lagras i tillståndet. Den här artikeln beskriver hur du använder vattenstämplar i dina Delta Live Tables-frågor och innehåller exempel på de rekommenderade åtgärderna.

Kommentar

För att säkerställa att frågor som utför aggregeringar bearbetas inkrementellt och inte helt omberäknas med varje uppdatering måste du använda vattenstämplar.

Vad är en vattenstämpel?

Vid dataströmbearbetning är en vattenstämpel en Apache Spark-funktion som kan definiera ett tidsbaserat tröskelvärde för bearbetning av data när tillståndskänsliga åtgärder som aggregeringar utförs. Data som anländer bearbetas tills tröskelvärdet har nåtts, då tidsfönstret som definieras av tröskelvärdet stängs. Vattenstämplar kan användas för att undvika problem vid frågebearbetning, främst vid bearbetning av större datamängder eller långvarig bearbetning. Dessa problem kan omfatta hög svarstid vid skapande av resultat och till och med OOM-fel (out-of-memory) på grund av mängden data som lagras i tillståndet under bearbetningen. Eftersom strömmande data i sig är osorterade stöder vattenstämplar också korrekt beräkning av åtgärder som aggregeringar i tidsfönster.

Mer information om hur du använder vattenstämplar i strömbearbetning finns i Vattenstämpling i Apache Spark Structured Streaming och Tillämpa vattenstämplar för att kontrollera tröskelvärden för databearbetning.

Hur definierar du en vattenstämpel?

Du definierar en vattenstämpel genom att ange ett tidsstämpelfält och ett värde som representerar tidströskeln för att sena data ska tas emot. Data anses vara sena om de tas emot efter det definierade tidströskelvärdet. Om tröskelvärdet till exempel definieras som 10 minuter kan poster som anländer efter tröskelvärdet på 10 minuter tas bort.

Eftersom poster som tas emot efter det definierade tröskelvärdet kan tas bort är det viktigt att välja ett tröskelvärde som uppfyller kraven på svarstid kontra korrekthet. Om du väljer ett mindre tröskelvärde genereras poster tidigare, men det innebär också att sena poster är mer benägna att tas bort. Ett större tröskelvärde innebär en längre väntetid men eventuellt mer fullständighet av data. På grund av den större tillståndsstorleken kan ett större tröskelvärde också kräva ytterligare beräkningsresurser. Eftersom tröskelvärdet beror på dina data- och bearbetningskrav är det viktigt att testa och övervaka bearbetningen för att fastställa ett optimalt tröskelvärde.

Du använder withWatermark() funktionen i Python för att definiera en vattenstämpel. I SQL använder du WATERMARK -satsen för att definiera en vattenstämpel:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Använda vattenstämplar med strömströmskopplingar

För strömströmskopplingar måste du definiera en vattenstämpel på båda sidor av kopplingen och en tidsintervallsats. Eftersom varje kopplingskälla har en ofullständig vy över data krävs tidsintervallsatsen för att meddela strömningsmotorn när inga ytterligare matchningar kan göras. Tidsintervallsatsen måste använda samma fält som används för att definiera vattenstämplarna.

Eftersom det kan finnas tillfällen då varje ström kräver olika tröskelvärden för vattenstämplar behöver strömmarna inte ha samma tröskelvärden. För att undvika data som saknas behåller strömningsmotorn en global vattenstämpel baserat på den långsammaste strömmen.

I följande exempel kopplas en ström av annonsvisningar och en ström av användare klickar på annonser. I det här exemplet måste ett klick ske inom 3 minuter efter exponeringen. När tidsintervallet på 3 minuter har passerat tas rader från tillståndet som inte längre kan matchas bort.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Utföra fönsteraggregeringar med vattenstämplar

En vanlig tillståndskänslig åtgärd för strömmande data är en fönsterad aggregering. Fönsteraggregeringar liknar grupperade aggregeringar, förutom att aggregeringsvärden returneras för den uppsättning rader som ingår i det definierade fönstret.

Ett fönster kan definieras som en viss längd och en aggregeringsåtgärd kan utföras på alla rader som ingår i fönstret. Spark Streaming stöder tre typer av fönster:

  • Rullande (fasta) fönster: En serie tidsintervall med fast storlek, icke-överlappande och sammanhängande. En indatapost tillhör endast ett enda fönster.
  • Skjutfönster: På samma sätt som rullande fönster är skjutfönster i fast storlek, men fönster kan överlappa varandra och en post kan falla i flera fönster.

När data kommer förbi slutet av fönstret plus vattenstämpelns längd godkänns inga nya data för fönstret, resultatet av aggregeringen genereras och tillståndet för fönstret tas bort.

I följande exempel beräknas en summa av visningar var 5:e minut med ett fast fönster. I det här exemplet använder select-satsen aliaset impressions_window, och sedan definieras själva fönstret som en del av GROUP BY -satsen. Fönstret måste baseras på samma tidsstämpelkolumn som vattenstämpeln clickTimestamp , kolumnen i det här exemplet.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Ett liknande exempel i Python för att beräkna vinst över timsbegränsade fönster:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Deduplicera strömmande poster

Strukturerad direktuppspelning har bearbetningsgarantier exakt en gång, men avduplicerar inte automatiskt poster från datakällor. Eftersom många meddelandeköer till exempel har minst en gång garantier bör dubbletter av poster förväntas när du läser från en av dessa meddelandeköer. Du kan använda dropDuplicatesWithinWatermark() funktionen för att av duplicera poster i ett angivet fält och ta bort dubbletter från en dataström även om vissa fält skiljer sig åt (till exempel händelsetid eller ankomsttid). Du måste ange en vattenstämpel för att använda dropDuplicatesWithinWatermark() funktionen. Alla duplicerade data som tas emot inom det tidsintervall som anges av vattenstämpeln tas bort.

Ordnade data är viktiga eftersom data som inte är i ordning gör att vattenstämpelvärdet går fel. När äldre data sedan kommer anses de vara sena och borttagna. Använd alternativet withEventTimeOrder för att bearbeta den första ögonblicksbilden i ordning baserat på tidsstämpeln som anges i vattenstämpeln. Alternativet withEventTimeOrder kan deklareras i koden som definierar datauppsättningen eller i pipelineinställningarna med hjälp av spark.databricks.delta.withEventTimeOrder.enabled. Till exempel:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Kommentar

Alternativet withEventTimeOrder stöds endast med Python.

I följande exempel bearbetas data efter clickTimestamp, och poster som kommer inom 5 sekunder från varandra som innehåller dubbletter userId och clickAdId kolumner tas bort.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimera pipelinekonfigurationen för tillståndskänslig bearbetning

För att förhindra produktionsproblem och långa svarstider rekommenderar Databricks att du aktiverar RocksDB-baserad tillståndshantering för tillståndskänslig dataströmbearbetning, särskilt om bearbetningen kräver att du sparar en stor mängd mellanliggande tillstånd.

Severless-pipelines hanterar automatiskt konfigurationer för tillståndslager.

Du kan aktivera RocksDB-baserad tillståndshantering genom att ange följande konfiguration innan du distribuerar en pipeline:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Mer information om RocksDB-tillståndslagret, inklusive konfigurationsrekommendationer för RocksDB, finns i Konfigurera RocksDB-tillståndslager på Azure Databricks.