Condividi tramite


Ottimizzare l'elaborazione con stato nelle tabelle live Delta con filigrane

Per gestire in modo efficace i dati mantenuti nello stato, usare le filigrane quando si esegue l'elaborazione di flussi con stato in tabelle live Delta, incluse aggregazioni, join e deduplicazione. Questo articolo descrive come usare le filigrane nelle query di tabelle live Delta e include esempi delle operazioni consigliate.

Nota

Per garantire che le query che eseguono aggregazioni vengano elaborate in modo incrementale e non completamente ricalcolate con ogni aggiornamento, è necessario usare filigrane.

Che cos'è una filigrana?

Nell'elaborazione del flusso, una filigrana è una funzionalità di Apache Spark che può definire una soglia basata sul tempo per l'elaborazione dei dati durante l'esecuzione di operazioni con stato, ad esempio le aggregazioni. I dati in arrivo vengono elaborati fino a quando non viene raggiunta la soglia, al momento in cui l'intervallo di tempo definito dalla soglia viene chiuso. Le filigrane possono essere usate per evitare problemi durante l'elaborazione delle query, principalmente durante l'elaborazione di set di dati di dimensioni maggiori o l'elaborazione a esecuzione prolungata. Questi problemi possono includere una latenza elevata nella produzione di risultati e anche errori di memoria insufficiente a causa della quantità di dati mantenuti nello stato durante l'elaborazione. Poiché i dati di streaming sono intrinsecamente non ordinati, le filigrane supportano anche il calcolo corretto di operazioni come le aggregazioni della finestra temporale.

Per altre informazioni sull'uso delle filigrane nell'elaborazione del flusso, vedere Filigrana in Apache Spark Structured Streaming e Applicare filigrane per controllare le soglie di elaborazione dei dati.

Come si definisce una filigrana?

È possibile definire una filigrana specificando un campo timestamp e un valore che rappresenta la soglia di tempo per dati in ritardo all'arrivo. I dati sono considerati in ritardo se arrivano dopo la soglia temporale definita. Ad esempio, se la soglia è definita come 10 minuti, i record che arrivano dopo la soglia di 10 minuti potrebbero essere eliminati.

Poiché i record che arrivano dopo la soglia definita potrebbero essere eliminati, è importante selezionare una soglia che soddisfi i requisiti di latenza e correttezza. La scelta di una soglia più piccola comporta l'emissione dei record prima, ma significa anche che è più probabile che i record in ritardo vengano eliminati. Una soglia maggiore indica un'attesa più lunga, ma probabilmente più completa dei dati. A causa delle dimensioni dello stato maggiori, una soglia maggiore potrebbe richiedere anche risorse di calcolo aggiuntive. Poiché il valore soglia dipende dai requisiti di dati e elaborazione, il test e il monitoraggio dell'elaborazione sono importanti per determinare una soglia ottimale.

Usa la funzione withWatermark() in Python per definire una filigrana. In SQL usare la clausola WATERMARK per definire una filigrana:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Usare filigrane con join tra flussi

Per i join flusso-flusso, è necessario definire una filigrana su entrambi i lati del join e una clausola di intervallo temporale. Poiché ogni origine join ha una visione incompleta dei dati, la clausola dell'intervallo di tempo è necessaria per indicare al motore di streaming quando non è possibile fare altre corrispondenze. La clausola intervallo di tempo deve utilizzare gli stessi campi usati per definire le filigrane.

Poiché potrebbero verificarsi momenti in cui ogni flusso richiede soglie diverse per le filigrane, i flussi non devono avere le stesse soglie. Per evitare dati mancanti, il motore di streaming mantiene una filigrana globale basata sul flusso più lento.

L'esempio seguente unisce un flusso di impression pubblicitarie e un flusso di clic degli utenti sugli annunci. In questo esempio, un clic deve verificarsi entro 3 minuti dall'impressione. Dopo il passaggio dell'intervallo di tempo di 3 minuti, le righe dello stato che non possono più essere confrontate vengono eliminate.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Eseguire aggregazioni finestrate con filigrane

Un'operazione con stato comune sui dati di streaming è un'aggregazione con finestra. Le aggregazioni con finestra sono simili alle aggregazioni raggruppate, ad eccezione del fatto che per il set di righe che fanno parte della finestra definita vengono restituiti valori aggregati.

Una finestra può essere definita come una determinata lunghezza e un'operazione di aggregazione può essere eseguita su tutte le righe che fanno parte di tale finestra. Spark Streaming supporta tre tipi di finestre:

  • Finestre a cascata (fisse): serie di intervalli di tempo fissi, non sovrapposti e contigui. Un record di input appartiene a una sola finestra.
  • Finestre scorrevoli: analogamente alle finestre a cascata, le finestre scorrevoli sono a dimensione fissa, ma le finestre possono sovrapporsi e un record può rientrare in più finestre.

Quando i dati arrivano oltre la fine della finestra più la lunghezza della filigrana, non vengono accettati nuovi dati per la finestra, il risultato dell'aggregazione viene generato e lo stato per la finestra viene eliminato.

Questo esempio calcola una somma di impression ogni 5 minuti, usando una finestra fissa. In questo esempio, la clausola select usa l'alias impressions_windowe quindi la finestra stessa viene definita come parte della clausola GROUP BY. La finestra deve essere basata sulla stessa colonna timestamp della filigrana, la colonna clickTimestamp in questo esempio.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Un esempio simile in Python per calcolare il profitto su finestre fisse orarie:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Deduplicare i record di streaming

Structured Streaming offre garanzie di elaborazione esattamente una volta, ma non deduplica automaticamente i record dalle origini dati. Ad esempio, poiché molte code di messaggi hanno almeno una volta garanzie, i record duplicati devono essere previsti durante la lettura da una di queste code di messaggi. È possibile usare la dropDuplicatesWithinWatermark() funzione per deduplicare i record in qualsiasi campo specificato, rimuovendo i duplicati da un flusso anche se alcuni campi differiscono , ad esempio l'ora dell'evento o l'ora di arrivo. È necessario specificare una filigrana per usare la funzione dropDuplicatesWithinWatermark(). Tutti i dati duplicati che arrivano entro l'intervallo di tempo specificato dalla filigrana vengono eliminati.

I dati ordinati sono importanti perché i dati non ordinati possono far avanzare in modo errato il valore della filigrana. Quindi, quando arrivano dati meno recenti, vengono considerati in ritardo e eliminati. Usare l'opzione withEventTimeOrder per elaborare lo snapshot iniziale in base al timestamp specificato nella filigrana. L'opzione withEventTimeOrder può essere dichiarata nel codice che definisce il set di dati o nelle impostazioni della pipeline usando spark.databricks.delta.withEventTimeOrder.enabled. Ad esempio:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Nota

L'opzione withEventTimeOrder è supportata solo con Python.

Nell'esempio seguente, i dati vengono elaborati in base a clickTimestampe i record che arrivano entro 5 secondi l'uno dall'altro e contengono colonne duplicate userId e clickAdId vengono eliminati.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Ottimizzare la configurazione della pipeline per l'elaborazione con stato

Per evitare problemi di produzione e una latenza eccessiva, Databricks consiglia di abilitare la gestione dello stato basata su RocksDB per l'elaborazione del flusso con stato, in particolare se l'elaborazione richiede un notevole risparmio di una grande quantità di stato intermedio.

Le pipeline senza sever gestiscono automaticamente le configurazioni dell'archivio stati.

È possibile abilitare la gestione dello stato basata su RocksDB impostando la configurazione seguente prima di distribuire una pipeline:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Per altre informazioni sull'archivio stati di RocksDB, incluse le raccomandazioni di configurazione per RocksDB, vedere Configurare l'archivio stati di RocksDB in Azure Databricks.