Ottimizzare l'elaborazione con stato nelle tabelle live Delta con filigrane
Per gestire in modo efficace i dati mantenuti nello stato, usare le filigrane quando si esegue l'elaborazione di flussi con stato in tabelle live Delta, incluse aggregazioni, join e deduplicazione. Questo articolo descrive come usare le filigrane nelle query di tabelle live Delta e include esempi delle operazioni consigliate.
Nota
Per garantire che le query che eseguono aggregazioni vengano elaborate in modo incrementale e non completamente ricalcolate con ogni aggiornamento, è necessario usare filigrane.
Che cos'è una filigrana?
Nell'elaborazione del flusso, una filigrana è una funzionalità di Apache Spark che può definire una soglia basata sul tempo per l'elaborazione dei dati durante l'esecuzione di operazioni con stato, ad esempio le aggregazioni. I dati in arrivo vengono elaborati fino a quando non viene raggiunta la soglia, al momento in cui l'intervallo di tempo definito dalla soglia viene chiuso. Le filigrane possono essere usate per evitare problemi durante l'elaborazione delle query, principalmente durante l'elaborazione di set di dati di dimensioni maggiori o l'elaborazione a esecuzione prolungata. Questi problemi possono includere una latenza elevata nella produzione di risultati e anche errori di memoria insufficiente a causa della quantità di dati mantenuti nello stato durante l'elaborazione. Poiché i dati di streaming sono intrinsecamente non ordinati, le filigrane supportano anche il calcolo corretto di operazioni come le aggregazioni della finestra temporale.
Per altre informazioni sull'uso delle filigrane nell'elaborazione del flusso, vedere Filigrana in Apache Spark Structured Streaming e Applicare filigrane per controllare le soglie di elaborazione dei dati.
Come si definisce una filigrana?
È possibile definire una filigrana specificando un campo timestamp e un valore che rappresenta la soglia di tempo per dati in ritardo all'arrivo. I dati sono considerati in ritardo se arrivano dopo la soglia temporale definita. Ad esempio, se la soglia è definita come 10 minuti, i record che arrivano dopo la soglia di 10 minuti potrebbero essere eliminati.
Poiché i record che arrivano dopo la soglia definita potrebbero essere eliminati, è importante selezionare una soglia che soddisfi i requisiti di latenza e correttezza. La scelta di una soglia più piccola comporta l'emissione dei record prima, ma significa anche che è più probabile che i record in ritardo vengano eliminati. Una soglia maggiore indica un'attesa più lunga, ma probabilmente più completa dei dati. A causa delle dimensioni dello stato maggiori, una soglia maggiore potrebbe richiedere anche risorse di calcolo aggiuntive. Poiché il valore soglia dipende dai requisiti di dati e elaborazione, il test e il monitoraggio dell'elaborazione sono importanti per determinare una soglia ottimale.
Usa la funzione withWatermark()
in Python per definire una filigrana. In SQL usare la clausola WATERMARK
per definire una filigrana:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Usare filigrane con join tra flussi
Per i join flusso-flusso, è necessario definire una filigrana su entrambi i lati del join e una clausola di intervallo temporale. Poiché ogni origine join ha una visione incompleta dei dati, la clausola dell'intervallo di tempo è necessaria per indicare al motore di streaming quando non è possibile fare altre corrispondenze. La clausola intervallo di tempo deve utilizzare gli stessi campi usati per definire le filigrane.
Poiché potrebbero verificarsi momenti in cui ogni flusso richiede soglie diverse per le filigrane, i flussi non devono avere le stesse soglie. Per evitare dati mancanti, il motore di streaming mantiene una filigrana globale basata sul flusso più lento.
L'esempio seguente unisce un flusso di impression pubblicitarie e un flusso di clic degli utenti sugli annunci. In questo esempio, un clic deve verificarsi entro 3 minuti dall'impressione. Dopo il passaggio dell'intervallo di tempo di 3 minuti, le righe dello stato che non possono più essere confrontate vengono eliminate.
Python
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Eseguire aggregazioni finestrate con filigrane
Un'operazione con stato comune sui dati di streaming è un'aggregazione con finestra. Le aggregazioni con finestra sono simili alle aggregazioni raggruppate, ad eccezione del fatto che per il set di righe che fanno parte della finestra definita vengono restituiti valori aggregati.
Una finestra può essere definita come una determinata lunghezza e un'operazione di aggregazione può essere eseguita su tutte le righe che fanno parte di tale finestra. Spark Streaming supporta tre tipi di finestre:
- Finestre a cascata (fisse): serie di intervalli di tempo fissi, non sovrapposti e contigui. Un record di input appartiene a una sola finestra.
- Finestre scorrevoli: analogamente alle finestre a cascata, le finestre scorrevoli sono a dimensione fissa, ma le finestre possono sovrapporsi e un record può rientrare in più finestre.
Quando i dati arrivano oltre la fine della finestra più la lunghezza della filigrana, non vengono accettati nuovi dati per la finestra, il risultato dell'aggregazione viene generato e lo stato per la finestra viene eliminato.
Questo esempio calcola una somma di impression ogni 5 minuti, usando una finestra fissa. In questo esempio, la clausola select usa l'alias impressions_window
e quindi la finestra stessa viene definita come parte della clausola GROUP BY
. La finestra deve essere basata sulla stessa colonna timestamp della filigrana, la colonna clickTimestamp
in questo esempio.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Un esempio simile in Python per calcolare il profitto su finestre fisse orarie:
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Deduplicare i record di streaming
Structured Streaming offre garanzie di elaborazione esattamente una volta, ma non deduplica automaticamente i record dalle origini dati. Ad esempio, poiché molte code di messaggi hanno almeno una volta garanzie, i record duplicati devono essere previsti durante la lettura da una di queste code di messaggi. È possibile usare la dropDuplicatesWithinWatermark()
funzione per deduplicare i record in qualsiasi campo specificato, rimuovendo i duplicati da un flusso anche se alcuni campi differiscono , ad esempio l'ora dell'evento o l'ora di arrivo. È necessario specificare una filigrana per usare la funzione dropDuplicatesWithinWatermark()
. Tutti i dati duplicati che arrivano entro l'intervallo di tempo specificato dalla filigrana vengono eliminati.
I dati ordinati sono importanti perché i dati non ordinati possono far avanzare in modo errato il valore della filigrana. Quindi, quando arrivano dati meno recenti, vengono considerati in ritardo e eliminati. Usare l'opzione withEventTimeOrder
per elaborare lo snapshot iniziale in base al timestamp specificato nella filigrana. L'opzione withEventTimeOrder
può essere dichiarata nel codice che definisce il set di dati o nelle impostazioni della pipeline usando spark.databricks.delta.withEventTimeOrder.enabled
. Ad esempio:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Nota
L'opzione withEventTimeOrder
è supportata solo con Python.
Nell'esempio seguente, i dati vengono elaborati in base a clickTimestamp
e i record che arrivano entro 5 secondi l'uno dall'altro e contengono colonne duplicate userId
e clickAdId
vengono eliminati.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Ottimizzare la configurazione della pipeline per l'elaborazione con stato
Per evitare problemi di produzione e una latenza eccessiva, Databricks consiglia di abilitare la gestione dello stato basata su RocksDB per l'elaborazione del flusso con stato, in particolare se l'elaborazione richiede un notevole risparmio di una grande quantità di stato intermedio.
Le pipeline senza sever gestiscono automaticamente le configurazioni dell'archivio stati.
È possibile abilitare la gestione dello stato basata su RocksDB impostando la configurazione seguente prima di distribuire una pipeline:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Per altre informazioni sull'archivio stati di RocksDB, incluse le raccomandazioni di configurazione per RocksDB, vedere Configurare l'archivio stati di RocksDB in Azure Databricks.