Applicare limiti per controllare le soglie di elaborazione dati
Questo articolo presenta i concetti di base della filigrana e fornisce raccomandazioni per l'uso di filigrane nelle operazioni di streaming con stato comuni. È necessario applicare filigrane alle operazioni di streaming con stato per evitare di espandere infinitamente la quantità di dati mantenuti nello stato, che potrebbe introdurre problemi di memoria e aumentare le latenze di elaborazione durante le operazioni di streaming a esecuzione prolungata.
Che cos'è un watermark?
Structured Streaming usa filigrane per controllare la soglia per quanto tempo continuare a elaborare gli aggiornamenti per una determinata entità di stato. Esempi comuni di entità di stato includono:
- Aggregazioni in un periodo di tempo window.
- Chiavi univoce in un join tra due flussi.
Quando si dichiara un watermark, si specifica un campo timestamp e una soglia watermark su un DataFrame di streaming. Quando arrivano nuovi dati, il gestore di stato tiene traccia del timestamp più recente nel campo specificato ed elabora tutti i record entro la soglia di ritardo.
Nell'esempio seguente viene applicata una soglia di 10 minuti watermark a un conteggio finestrato:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
In questo esempio:
- Il
event_time
column viene usato per definire un watermark di 10 minuti e un windowrotante di 5 minuti. - Viene raccolto un conteggio per ogni
id
finestra di 5 minuti non sovrapposta. - Le informazioni sullo stato vengono mantenute per ogni conteggio fino a quando la fine di window non è di 10 minuti più vecchia rispetto all'ultimo
event_time
osservato.
Importante
Le soglie Watermark garantiscono che i record in arrivo entro la soglia specificata vengano elaborati secondo la semantica della query definita. I record in arrivo in ritardo che arrivano al di fuori della soglia specificata potrebbero comunque essere elaborati usando le metriche di query, ma questo non è garantito.
In che modo le filigrane influiscono sul tempo di elaborazione e sulla velocità effettiva?
Le filigrane interagiscono con le modalità di output per controllare quando i dati vengono scritti nel sink. Poiché le filigrane riducono la quantità totale di informazioni sullo stato da elaborare, un loro utilizzo efficace è essenziale per una velocità effettiva efficiente dello streaming con stato.
Nota
Non tutte le modalità di output sono supportate per tutte le operazioni con stato.
Filigrane e modalità di output per le aggregazioni finestrate
I seguenti dettagli table elaborano le query con aggregazione su un timestamp con un watermark definito:
Modalità output | Comportamento |
---|---|
Aggiunta | Le righe vengono scritte nel table di destinazione dopo il superamento della soglia di watermark. Tutte le scritture vengono ritardate in base alla soglia di ritardo. Lo stato dell'aggregazione precedente viene eliminato dopo il superamento della soglia. |
Update | Le righe vengono scritte nella table di destinazione quando vengono calcolati i risultati e possono essere aggiornate e sovrascritte man mano che arrivano nuovi dati. Lo stato dell'aggregazione precedente viene eliminato dopo il superamento della soglia. |
Completo | Lo stato dell'aggregazione non viene eliminato. Il table di destinazione viene riscritto ogni volta che viene attivato. |
Filigrane e output per i join di flusso di flusso
I join tra più flussi supportano solo la modalità di accodamento e i record corrispondenti vengono scritti in ogni batch individuati. Per gli inner join, Databricks consiglia di impostare una soglia watermark per ogni origine dei dati in streaming. Ciò consente di eliminare le informazioni sullo stato per i record precedenti. Senza filigrane, Structured Streaming tenta di join ogni chiave da entrambi i lati del join con ogni trigger.
Structured Streaming include una semantica speciale per supportare outer join. La filigrana è obbligatoria per i outer join, in quanto indica quando una chiave deve essere scritta con un valore Null dopo aver superato la corrispondenza. Si noti che mentre i outer join possono essere utili per registrare record che non corrispondono mai durante l'elaborazione dei dati, poiché i join scrivono solo in tables come operazioni di accodamento, questi dati mancanti non vengono registrati fino al superamento della soglia di ritardo.
Controllare la soglia dei dati tardivi con politiche multiple di watermark in Structured Streaming
Quando si usano più input di Structured Streaming, è possibile set più filigrane per controllare le soglie di tolleranza per i dati in arrivo in ritardo. La configurazione delle filigrane consente di controllare le informazioni sullo stato e influisce sulla latenza.
Una query di streaming può avere più flussi di input uniti o uniti. Ognuno dei flussi di input può avere una soglia diversa di dati tardivi che deve essere tollerata per le operazioni con stato. Specificare queste soglie usando withWatermarks("eventTime", delay)
in ognuno dei flussi di input. Di seguito è riportata una query di esempio con join di flusso di flusso.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Durante l'esecuzione della query, Structured Streaming tiene traccia singolarmente del massimo tempo di evento osservato in ogni flusso di input, calcola i watermark in base al ritardo corrispondente e sceglie un singolo watermark globale da utilizzare per le operazioni con stato. Per impostazione predefinita, il valore minimo viene scelto come watermark globale perché garantisce che nessun dato venga accidentalmente scartato perché troppo tardivo se uno dei flussi rimane indietro rispetto agli altri (ad esempio, uno dei flussi smette di ricevere dati a causa di problemi a monte). In altre parole, il watermark globale si sposta in modo sicuro al ritmo del flusso più lento e il risultato della query viene ritardato di conseguenza.
Se si desidera get risultati più veloci, è possibile set più criteri di watermark per scegliere il valore massimo come watermark globale impostando il spark.sql.streaming.multipleWatermarkPolicy
di configurazione SQL su max
(il valore predefinito è min
). Ciò consente al watermark globale di muoversi al ritmo della corrente più rapida. Tuttavia, questa configurazione elimina i dati dai flussi più lenti. Per questo motivo, Databricks consiglia di usare questa configurazione in modo succoso.
Eliminare i duplicati all'interno di watermark
In Databricks Runtime 13.3 LTS e versioni successive è possibile deduplicare i record all'interno di una soglia di watermark utilizzando un identifierunivoco.
Structured Streaming offre garanzie di elaborazione esattamente una volta, ma non deduplica automaticamente i record dalle origini dati. È possibile usare dropDuplicatesWithinWatermark
per deduplicare i record basandosi su qualsiasi campo specificato, consentendo di remove rimuovere i duplicati da un flusso anche se alcuni campi sono diversi, ad esempio l'ora dell'evento o l'ora di arrivo.
I record duplicati che arrivano all'interno del watermark specificato devono essere eliminati. Questa garanzia è rigorosa in una sola direzione e anche i record duplicati che arrivano al di fuori della soglia specificata potrebbero essere eliminati. È necessario set la soglia di ritardo di watermark in modo che sia più lunga delle massime differenze di timestamp tra gli eventi duplicati per remove tutti i duplicati.
È necessario specificare un watermark per usare il metodo dropDuplicatesWithinWatermark
, come nell'esempio seguente:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])