Applicare limiti per controllare le soglie di elaborazione dati
Questo articolo presenta i concetti di base della filigrana e fornisce raccomandazioni per l'uso di filigrane nelle operazioni di streaming con stato comuni. È necessario applicare filigrane alle operazioni di streaming con stato per evitare di espandere infinitamente la quantità di dati mantenuti nello stato, che potrebbe introdurre problemi di memoria e aumentare le latenze di elaborazione durante le operazioni di streaming a esecuzione prolungata.
Che cos'è una filigrana?
Structured Streaming usa filigrane per controllare la soglia per quanto tempo continuare a elaborare gli aggiornamenti per una determinata entità di stato. Esempi comuni di entità di stato includono:
- Aggregazioni in un intervallo di tempo.
- Chiavi univoce in un join tra due flussi.
Quando dichiari una filigrana, specifichi un campo timestamp e una soglia limite in un dataframe di streaming. Quando arrivano nuovi dati, il gestore di stato tiene traccia del timestamp più recente nel campo specificato ed elabora tutti i record entro la soglia di ritardo.
Nell'esempio seguente viene applicata una soglia limite di 10 minuti a un conteggio finestrato:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
In questo esempio:
- La
event_time
colonna viene usata per definire una filigrana di 10 minuti e una finestra a cascata di 5 minuti. - Viene raccolto un conteggio per ogni
id
finestra di 5 minuti non sovrapposta. - Le informazioni sullo stato vengono mantenute per ogni conteggio fino a quando la fine della finestra non è superiore a 10 minuti rispetto all'ultima osservata
event_time
.
Importante
Le soglie limite garantiscono che i record in arrivo entro la soglia specificata vengano elaborati in base alla semantica della query definita. I record in arrivo in ritardo che arrivano al di fuori della soglia specificata potrebbero comunque essere elaborati usando le metriche di query, ma questo non è garantito.
In che modo le filigrane influiscono sul tempo di elaborazione e sulla velocità effettiva?
Le filigrane interagiscono con le modalità di output per controllare quando i dati vengono scritti nel sink. Poiché le filigrane riducono la quantità totale di informazioni sullo stato da elaborare, un loro utilizzo efficace è essenziale per una velocità effettiva efficiente dello streaming con stato.
Nota
Non tutte le modalità di output sono supportate per tutte le operazioni con stato.
Filigrane e modalità di output per le aggregazioni finestrate
La tabella seguente illustra in dettaglio l'elaborazione delle query con aggregazione in un timestamp con una filigrana definita:
Modalità output | Comportamento |
---|---|
Aggiunta | Le righe vengono scritte nella tabella di destinazione dopo il superamento della soglia limite. Tutte le scritture vengono ritardate in base alla soglia di ritardo. Lo stato dell'aggregazione precedente viene eliminato dopo il superamento della soglia. |
Update | Le righe vengono scritte nella tabella di destinazione man mano che i risultati vengono calcolati e possono essere aggiornate e sovrascritte man mano che arrivano nuovi dati. Lo stato dell'aggregazione precedente viene eliminato dopo il superamento della soglia. |
Completo | Lo stato dell'aggregazione non viene eliminato. La tabella di destinazione viene riscritta con ogni trigger. |
Filigrane e output per i join di flusso di flusso
I join tra più flussi supportano solo la modalità di accodamento e i record corrispondenti vengono scritti in ogni batch individuati. Per i inner join, Databricks consiglia di impostare una soglia limite per ogni origine dati di streaming. Ciò consente di eliminare le informazioni sullo stato per i record precedenti. Senza filigrane, Structured Streaming tenta di unire ogni chiave da entrambi i lati del join con ogni trigger.
Structured Streaming include una semantica speciale per supportare outer join. La filigrana è obbligatoria per i outer join, in quanto indica quando una chiave deve essere scritta con un valore Null dopo aver superato la corrispondenza. Si noti che mentre i outer join possono essere utili per registrare record che non corrispondono mai durante l'elaborazione dei dati, poiché i join scrivono solo nelle tabelle come operazioni di accodamento, questi dati mancanti non vengono registrati fino al superamento della soglia di ritardo.
Controllare la soglia dei dati tardivi con più criteri limite in Structured Streaming
Quando si usano più input di Structured Streaming, è possibile impostare più filigrane per controllare le soglie di tolleranza per i dati in arrivo in ritardo. La configurazione delle filigrane consente di controllare le informazioni sullo stato e influisce sulla latenza.
Una query di streaming può avere più flussi di input uniti o uniti. Ognuno dei flussi di input può avere una soglia diversa di dati tardivi che deve essere tollerata per le operazioni con stato. Specificare queste soglie usando withWatermarks("eventTime", delay)
in ognuno dei flussi di input. Di seguito è riportata una query di esempio con join di flusso di flusso.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Durante l'esecuzione della query, Structured Streaming tiene traccia singolarmente del tempo massimo di evento visualizzato in ogni flusso di input, calcola le filigrane in base al ritardo corrispondente e sceglie una singola filigrana globale con cui usarle per le operazioni con stato. Per impostazione predefinita, il valore minimo viene scelto come filigrana globale perché garantisce che nessun dato venga accidentalmente eliminato come troppo tardi se uno dei flussi cade dietro gli altri (ad esempio, uno dei flussi interrompe la ricezione dei dati a causa di errori upstream). In altre parole, la filigrana globale si sposta in modo sicuro al ritmo del flusso più lento e l'output della query viene ritardato di conseguenza.
Per ottenere risultati più veloci, è possibile impostare i criteri di filigrana multipli per scegliere il valore massimo come filigrana globale impostando la configurazione spark.sql.streaming.multipleWatermarkPolicy
SQL su max
(il valore predefinito è min
). In questo modo la filigrana globale si sposta al ritmo del flusso più veloce. Tuttavia, questa configurazione elimina i dati dai flussi più lenti. Per questo motivo, Databricks consiglia di usare questa configurazione in modo succoso.
Eliminare i duplicati all'interno della filigrana
In Databricks Runtime 13.3 LTS e versioni successive è possibile deduplicare i record all'interno di una soglia limite usando un identificatore univoco.
Structured Streaming offre garanzie di elaborazione esattamente una volta, ma non deduplica automaticamente i record dalle origini dati. È possibile usare dropDuplicatesWithinWatermark
per deduplicare i record in qualsiasi campo specificato, consentendo di rimuovere i duplicati da un flusso anche se alcuni campi sono diversi, ad esempio l'ora dell'evento o l'ora di arrivo.
È garantito che i record duplicati che arrivano all'interno della filigrana specificata vengano eliminati. Questa garanzia è rigorosa in una sola direzione e anche i record duplicati che arrivano al di fuori della soglia specificata potrebbero essere eliminati. Per rimuovere tutti i duplicati, è necessario impostare la soglia di ritardo della filigrana più lunga del numero massimo di differenze di timestamp tra gli eventi duplicati.
È necessario specificare una filigrana per usare il dropDuplicatesWithinWatermark
metodo , come nell'esempio seguente:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])