Selezionare una modalità di output per Structured Streaming
Questo articolo illustra la selezione di una modalità di output per lo streaming con stato. Solo i flussi con stato contenenti aggregazioni richiedono una configurazione della modalità di output.
I join supportano solo la modalità di output Append e la modalità di output non influisce sulla deduplicazione. Gli operatori arbitrari con stato mapGroupsWithState
e flatMapGroupsWithState
generano record usando la propria logica personalizzata, quindi la modalità di output del flusso non influisce sul comportamento.
Per lo streaming senza stato, tutte le modalità di output si comportano allo stesso modo.
Per configurare correttamente la modalità di output, è necessario comprendere lo streaming con stato, i limiti e i trigger. Fai riferimento ai seguenti articoli:
- Che cos'è lo streaming con stato?
- Applicare limiti per controllare le soglie di elaborazione dati
- Configurare gli intervalli di trigger di Structured Streaming
Che cos'è la modalità di output?
La modalità di output di una query Structured Streaming determina quali record generano gli operatori della query durante ogni trigger. I tre tipi di record che possono essere generati sono:
- I record che verranno elaborati in futuro non cambiano.
- Record modificati dall'ultimo trigger.
- Ottenere tutti i record nella tabella di stato.
Conoscere i tipi di record da generare è importante per gli operatori con stato perché una particolare riga prodotta da un operatore con stato potrebbe cambiare da trigger a trigger. Ad esempio, poiché un operatore di aggregazione di streaming riceve più righe per una determinata finestra, i valori di aggregazione della finestra potrebbero cambiare tra i trigger.
Per gli operatori senza stato, la distinzione tra i tipi di record non influisce sul comportamento dell'operatore. I record generati da un operatore senza stato durante un trigger sono sempre i record di origine elaborati durante tale trigger.
Modalità di output disponibili
Esistono tre modalità di output che indicano a un operatore quali record generare durante un determinato trigger:
Modalità di output | Descrizione |
---|---|
Modalità Append (impostazione predefinita) | Per impostazione predefinita, le query di streaming vengono eseguite in modalità Append. In questa modalità, gli operatori generano solo righe che non cambiano nei trigger futuri. Gli operatori con stato usano il limite per determinare quando si verifica questo problema. |
Modalità di aggiornamento | In modalità di aggiornamento, gli operatori generano tutte le righe modificate durante il trigger, anche se il record generato potrebbe cambiare in un trigger successivo. |
Modalità completa | La modalità completa funziona solo con le aggregazioni di streaming. In modalità completa, tutte le righe risultanti mai prodotte dall'operatore vengono generate a downstream. |
Considerazioni sulla produzione
Per molte operazioni di streaming con stato, è necessario scegliere tra le modalità Append e di aggiornamento. Le sezioni seguenti illustrano le considerazioni che potrebbero orientare la decisione.
Nota
La modalità completa include alcune applicazioni, ma può comportare prestazioni scarse quando i dati vengono ridimensionati. Databricks consiglia di usare viste materializzate per ottenere garanzie semantiche associate alla modalità completa con l'elaborazione incrementale per molte operazioni con stato. Vedere Utilizzare le viste materializzate in Databricks SQL.
Semantica dell'applicazione
La semantica dell'applicazione descrive come le applicazioni downstream usano i dati di streaming.
Se i servizi downstream devono eseguire una singola azione per ogni scrittura downstream, usare la modalità Append nella maggior parte dei casi. Ad esempio, se si dispone di un servizio di notifica downstream che invia notifiche per ogni nuovo record scritto nel sink, la modalità Append garantisce che ogni record venga scritto una sola volta. La modalità di aggiornamento scrive il record ogni volta che le informazioni sullo stato cambiano, determinando numerosi aggiornamenti.
Se i servizi downstream necessitano di risultati aggiornati, la modalità di aggiornamento garantisce che il sink rimanga aggiornato il più possibile. Gli esempi includono un modello di Machine Learning che legge le funzionalità in tempo reale o un dashboard di analisi che rileva le aggregazioni in tempo reale.
Compatibilità tra operatore e sink
Structured Streaming non supporta tutte le operazioni disponibili in Apache Spark e alcune operazioni di streaming non sono supportate in tutte le modalità di output. Per altre informazioni sulle limitazioni degli operatori, vedere la documentazione sullo streaming OSS.
Non tutti i sink supportano tutte le modalità di output. Entrambi Delta Lake, che gestisce tutte le tabelle gestite di Unity Catalog, e Kafka supportano tutte le modalità di output. Per altre informazioni sulla compatibilità del sink, vedere la documentazione sullo streaming OSS.
Latenza e costi
La modalità di output influisce sul tempo necessario prima di scrivere un record e la frequenza e la quantità di dati scritti possono influire sui costi associati alle pipeline di streaming.
La modalità Append forza gli operatori con stato a generare risultati solo dopo la finalizzazione dei risultati con stato, ovvero almeno fino al ritardo del limite. Un ritardo limite di 1 hour
nella modalità di output Append indica che i record hanno almeno un ritardo di 1 ora prima di essere emesso downstream.
La modalità di aggiornamento genera una scrittura per trigger per ogni valore di aggregazione. Se il sink viene addebitato per scrittura per record, questo può essere costoso se i record vengono aggiornati più volte prima che il ritardo del limite venga superato.
Esempi di configurazione
Gli esempi di codice seguenti illustrano la configurazione della modalità di output per lo streaming degli aggiornamenti alle tabelle del catalogo Unity:
Python
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)
# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)
# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)
# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)
Scala
// Append output mode (default)
df.writeStream
.toTable("target_table")
// Append output mode (same as default behavior)
df.writeStream
.outputMode("append")
.toTable("target_table")
// Update output mode
df.writeStream
.outputMode("update")
.toTable("target_table")
// Complete output mode
df.writeStream
.outputMode("complete")
.toTable("target_table")
Vedere la documentazione OSS per PySpark DataStreamWriter.outputMode o Scala DataStreamWriter.outputMode.
Esempio di modalità di streaming e output con stato
L'esempio seguente è progettato per aiutarti a ragionare con la modalità di output con limiti per lo streaming con stato.
Si consideri un'aggregazione di streaming che calcola i ricavi totali generati ogni ora in un archivio con un ritardo limite di 15 minuti. Il primo microbatch elabora i record seguenti:
- $ 15 alle 14:40
- $ 10 alle 14:30
- $ 30 alle 13:10
A questo punto, il limite del modulo è 14:55 perché sottrae 15 minuti (il ritardo) dal tempo massimo visto (15:10). L'operatore di aggregazione di streaming ha il seguente stato:
-
[2pm, 3pm]
: $ 25 -
[3pm, 4pm]
: $ 30
La tabella seguente descrive cosa accadrebbe in ogni modalità di output:
Modalità output | Risultato e motivo |
---|---|
Aggiunta | L'operatore di aggregazione di streaming non genera alcun elemento downstream. Questo perché entrambe queste finestre potrebbero cambiare man mano che vengono visualizzati nuovi valori con un trigger successivo: il limite delle 14:55 indica che i record dopo le 14:55 potrebbero ancora arrivare e tali record potrebbero rientrare nella finestra [2pm, 3pm] o nella finestra [3pm, 4pm] . |
Update | L'operatore genera entrambi i record, perché entrambi i record hanno ricevuto aggiornamenti. |
Completo | L'operatore genera tutti i record. |
Si supponga ora che il flusso riceva un altro record:
- $ 20 alle 15:20
Il limite viene aggiornato alle 15:05 perché il modulo sottrae 15 minuti dalle 15:20. A questo punto, l'operatore di aggregazione di streaming ha il seguente stato:
-
[2pm, 3pm]
: $ 25 -
[3pm, 4pm]
: $ 50
La tabella seguente descrive cosa accadrebbe in ogni modalità di output:
Modalità output | Risultato e motivo |
---|---|
Aggiunta | L'operatore di aggregazione di streaming osserva che il limite di 15:05 è maggiore della fine della finestra [2pm, 3pm] . Per definizione del limite, tale finestra non può più cambiare, quindi genera la finestra[2pm, 3pm] . |
Update | L'operatore di aggregazione di streaming genera la finestra [3pm, 4pm] perché il valore dello stato è cambiato da $ 30 a $ 50. |
Completo | L'operatore genera tutti i record. |
Di seguito viene riepilogato il comportamento degli operatori con stato in ogni modalità Append:
- In modalità Append scrivere record una volta dopo il ritardo del limite.
- In modalità di aggiornamento scrivere i record modificati dopo il trigger precedente.
- In modalità completa, scrivere tutti i record mai prodotti dall'operatore con stato.