Condividi tramite


Applicare limiti per controllare le soglie di elaborazione dati

Questo articolo presenta i concetti di base della filigrana e fornisce raccomandazioni per l'uso di filigrane nelle operazioni di streaming con stato comuni. È necessario applicare filigrane alle operazioni di streaming con stato per evitare di espandere infinitamente la quantità di dati mantenuti nello stato, che potrebbe introdurre problemi di memoria e aumentare le latenze di elaborazione durante le operazioni di streaming a esecuzione prolungata.

Che cos'è una filigrana?

Structured Streaming usa filigrane per controllare la soglia per quanto tempo continuare a elaborare gli aggiornamenti per una determinata entità di stato. Esempi comuni di entità di stato includono:

  • Aggregazioni in un intervallo di tempo.
  • Chiavi univoce in un collegamento tra due flussi.

Quando dichiari una filigrana in un dataframe di streaming, specifichi un campo di timestamp e una soglia. Quando arrivano nuovi dati, il gestore di stato tiene traccia del timestamp più recente nel campo specificato ed elabora tutti i record entro la soglia di ritardo.

Nell'esempio seguente viene applicata una soglia limite di 10 minuti a un conteggio finestrato:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

In questo esempio:

  • La colonna event_time viene usata per definire una filigrana di 10 minuti e una finestra a scorrimento fisso di 5 minuti.
  • Viene raccolto un conteggio per ogni id finestra di 5 minuti non sovrapposta.
  • Le informazioni sullo stato vengono mantenute per ogni conteggio fino a quando la fine della finestra non è più vecchia di 10 minuti rispetto all'ultima event_timeosservata.

Importante

Le soglie limite garantiscono che i record in arrivo entro la soglia specificata vengano elaborati in base alla semantica della query definita. I record in arrivo in ritardo che arrivano al di fuori della soglia specificata potrebbero comunque essere elaborati usando le metriche di query, ma questo non è garantito.

In che modo le filigrane influiscono sul tempo di elaborazione e sulla velocità effettiva?

Le filigrane interagiscono con le modalità di output per controllare quando i dati vengono scritti nel sink. Poiché le filigrane riducono la quantità totale di informazioni sullo stato da elaborare, un loro utilizzo efficace è essenziale per una velocità effettiva efficiente dello streaming con stato.

Nota

Non tutte le modalità di output sono supportate per tutte le operazioni con stato.

Filigrane e modalità di output per le aggregazioni finestrate

La tabella seguente illustra in dettaglio l'elaborazione delle query con aggregazione su un timestamp con una filigrana definita.

Modalità output Comportamento
Aggiunta Le righe vengono scritte nella tabella di destinazione dopo il superamento della soglia limite. Tutte le scritture vengono ritardate in base alla soglia di ritardo. Lo stato dell'aggregazione precedente viene eliminato dopo il superamento della soglia.
Aggiornare Le righe vengono scritte nella tabella di destinazione man mano che i risultati vengono calcolati e possono essere aggiornate e sovrascritte man mano che arrivano nuovi dati. Lo stato dell'aggregazione precedente viene eliminato dopo il superamento della soglia.
Completo Lo stato dell'aggregazione non viene eliminato. La tabella di destinazione viene riscritta con ogni trigger.

Filigrane e output per i join di flusso di flusso

I join tra più flussi supportano solo la modalità di accodamento e i record corrispondenti vengono scritti in ogni batch individuati. Per i inner join, Databricks consiglia di impostare una soglia limite per ogni origine dati di streaming. Ciò consente di eliminare le informazioni sullo stato per i record precedenti. Senza filigrane, Structured Streaming tenta di unire ogni chiave da entrambe le parti del join a ogni attivazione.

Structured Streaming include una semantica speciale per supportare outer join. La filigrana è obbligatoria per i outer join, in quanto indica quando una chiave deve essere scritta con un valore Null dopo aver superato la corrispondenza. Si noti che mentre gli outer join possono essere utili per registrare i record che non vengono mai abbinati durante l'elaborazione dei dati, poiché i join scrivono nelle tabelle solo come operazioni di accodamento, questi dati mancanti non vengono registrati finché non viene superata la soglia di lateness.

Controllare la soglia dei dati tardivi con più criteri limite in Structured Streaming

Quando si usano più input di Structured Streaming, è possibile impostare più filigrane per controllare le soglie di tolleranza per i dati in arrivo in ritardo. La configurazione delle filigrane consente di controllare le informazioni sullo stato e influisce sulla latenza.

Una query di streaming può avere più flussi di input uniti o uniti. Ognuno dei flussi di input può avere una soglia diversa di dati tardivi che deve essere tollerata per le operazioni con stato. Specificare queste soglie usando withWatermarks("eventTime", delay) in ognuno dei flussi di input. Di seguito è riportata una query di esempio con join di flusso di flusso.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Durante l'esecuzione della query, Structured Streaming tiene traccia singolarmente del tempo massimo di evento visualizzato in ogni flusso di input, calcola le filigrane in base al ritardo corrispondente e sceglie una singola filigrana globale con cui usarle per le operazioni con stato. Per impostazione predefinita, il valore minimo viene scelto come livello di riferimento globale perché garantisce che nessun dato venga accidentalmente eliminato come troppo tardi se uno dei flussi resta indietro rispetto agli altri (ad esempio, uno dei flussi interrompe la ricezione dei dati a causa di interruzioni a monte). In altre parole, la filigrana globale si sposta in modo sicuro al ritmo del flusso più lento e l'output della query viene quindi ritardato di conseguenza.

Se si desidera ottenere risultati più veloci, è possibile impostare la politica multipla di watermark per scegliere il valore massimo come watermark globale impostando il spark.sql.streaming.multipleWatermarkPolicy di configurazione SQL su max (il valore predefinito è min). In questo modo la filigrana globale si sposta al ritmo del flusso più veloce. Tuttavia, questa configurazione elimina i dati dai flussi più lenti. Per questo motivo, Databricks consiglia di usare questa configurazione in modo succoso.

Eliminare i duplicati all'interno della filigrana

In Databricks Runtime 13.3 LTS e versioni successive è possibile deduplicare i record all'interno di una soglia limite usando un identificatore univoco.

Structured Streaming offre garanzie di elaborazione esattamente una volta, ma non deduplica automaticamente i record dalle origini dati. È possibile usare dropDuplicatesWithinWatermark per deduplicare i record in qualsiasi campo specificato, consentendo di rimuovere i duplicati da un flusso anche se alcuni campi sono diversi, ad esempio l'ora dell'evento o l'ora di arrivo.

È garantito che i record duplicati che arrivano entro la filigrana specificata vengano eliminati. Questa garanzia è rigorosa in una sola direzione e anche i record duplicati che arrivano al di fuori della soglia specificata potrebbero essere eliminati. Per rimuovere tutti i duplicati, è necessario impostare la soglia di ritardo della filigrana superiore alla massima differenza di timestamp tra gli eventi duplicati.

È necessario specificare una filigrana per usare il metodo dropDuplicatesWithinWatermark, come nell'esempio seguente:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])