Checkpoint di Structured Streaming
I checkpoint e i log write-ahead interagiscono per offrire garanzie di elaborazione per i carichi di lavoro Structured Streaming. Il checkpoint tiene traccia delle informazioni che identificano la query, incluse le informazioni sullo stato e i record elaborati. Quando si eliminano i file in una directory del checkpoint o si passa a un nuovo percorso del checkpoint, viene avviata la successiva esecuzione della query.
Ogni query deve avere un percorso di checkpoint diverso. Più query non devono mai condividere la stessa posizione.
Abilitare checkpoint avanzati per le query di Structured Streaming
È necessario specificare l'checkpointLocation
opzione prima di eseguire una query di streaming, come nell'esempio seguente:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Nota
Alcuni sink, ad esempio l'output per display()
nei notebook e il memory
sink, generano automaticamente una posizione di checkpoint temporanea se si omette questa opzione. Queste posizioni di checkpoint temporanee non garantiscono alcuna tolleranza di errore o garanzie di coerenza dei dati e potrebbero non essere pulite correttamente. Databricks consiglia di specificare sempre una posizione del checkpoint per questi sink.
Eseguire il ripristino dopo le modifiche in una query Structured Streaming
Esistono limitazioni sulle modifiche apportate a una query di streaming tra i riavvii dalla stessa posizione del checkpoint. Ecco alcune modifiche che non sono consentite o l'effetto della modifica non è ben definito. Per tutti:
- Il termine consentito significa che è possibile eseguire la modifica specificata, ma se la semantica del suo effetto è ben definita o no dipende dalla query e dalla modifica.
- Il termine non consentito indica che non è consigliabile eseguire la modifica specificata perché è probabile che la query riavviata abbia esito negativo con errori imprevedibili.
sdf
rappresenta un dataframe di streaming/set di dati generato consparkSession.readStream
.
Tipi di modifiche nelle query Structured Streaming
- Modifiche al numero o al tipo (ovvero origine diversa) delle origini di input: non è consentito.
- Modifiche nei parametri delle origini di input: indica se questa opzione è consentita e se la semantica della modifica è ben definita dipende dall'origine e dalla query. Ecco alcuni esempi.
È consentito aggiungere, eliminare e modificare i limiti di frequenza:
spark.readStream.format("kafka").option("subscribe", "article")
to
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
Le modifiche apportate agli articoli e ai file sottoscritti non sono in genere consentite perché i risultati sono imprevedibili:
spark.readStream.format("kafka").option("subscribe", "article")
aspark.readStream.format("kafka").option("subscribe", "newarticle")
- Modifiche nell'intervallo di trigger: è possibile modificare i trigger tra batch incrementali e intervalli di tempo. Vedere Modifica degli intervalli di trigger tra le esecuzioni.
- Modifiche nel tipo di sink di output: sono consentite modifiche tra alcune combinazioni specifiche di sink. Questo deve essere verificato caso per caso. Ecco alcuni esempi.
- Il sink di file nel sink Kafka è consentito. Kafka vedrà solo i nuovi dati.
- Il sink Kafka nel sink di file non è consentito.
- Sink Kafka modificato in foreach o viceversa, è consentito.
- Modifiche nei parametri di sink di output: indica se questa opzione è consentita e se la semantica della modifica è ben definita dipende dall'origine e dalla query. Ecco alcuni esempi.
- Le modifiche apportate alla directory di output di un sink di file non sono consentite:
sdf.writeStream.format("parquet").option("path", "/somePath")
asdf.writeStream.format("parquet").option("path", "/anotherPath")
- Le modifiche apportate all'argomento di output sono consentite:
sdf.writeStream.format("kafka").option("topic", "topic1")
asdf.writeStream.format("kafka").option("topic", "topic2")
- Le modifiche apportate al sink foreach definito dall'utente(ovvero il
ForeachWriter
codice) sono consentite, ma la semantica della modifica dipende dal codice.
- Le modifiche apportate alla directory di output di un sink di file non sono consentite:
- Modifiche apportate alle operazioni di proiezione/filtro/mapping: alcuni casi sono consentiti. Ad esempio:
- L'aggiunta o l'eliminazione dei filtri è consentita:
sdf.selectExpr("a")
asdf.where(...).selectExpr("a").filter(...)
. - Le modifiche apportate alle proiezioni con lo stesso schema di output sono consentite:
sdf.selectExpr("stringColumn AS json").writeStream
asdf.select(to_json(...).as("json")).writeStream
. - Le modifiche apportate alle proiezioni con schema di output diverso sono consentite in modo condizionale:
sdf.selectExpr("a").writeStream
asdf.selectExpr("b").writeStream
è consentito solo se il sink di output permette la modifica dello schema da"a"
a"b"
.
- L'aggiunta o l'eliminazione dei filtri è consentita:
- Modifiche nelle operazioni con stato: alcune operazioni nelle query di streaming devono mantenere i dati sullo stato per aggiornare continuamente il risultato. Structured Streaming esegue automaticamente il checkpoint dei dati di stato nell'archiviazione a tolleranza di errore (ad esempio, DBFS, archiviazione BLOB di Azure) e lo ripristina dopo il riavvio. Tuttavia, si presuppone che lo schema dei dati di stato rimanga invariato tra i riavvii. Ciò significa che tutte le modifiche (ovvero aggiunte, eliminazioni o modifiche dello schema) alle operazioni con stato di una query di streaming non sono consentite tra i riavvii. Di seguito è riportato l'elenco delle operazioni con stato il cui schema non deve essere modificato tra i riavvii per garantire il ripristino dello stato:
- Aggregazione di streaming: ad esempio,
sdf.groupBy("a").agg(...)
. Non è consentito modificare il numero o il tipo di chiavi di raggruppamento o aggregazioni. - Deduplicazione di streaming: ad esempio,
sdf.dropDuplicates("a")
. Non è consentito modificare il numero o il tipo di chiavi di raggruppamento o aggregazioni. - Join stream-stream: ad esempio,
sdf1.join(sdf2, ...)
(entrambi gli input vengono generati consparkSession.readStream
). Non sono consentite modifiche allo schema o alle colonne equi-join. Le modifiche apportate al tipo di join (esterno o interno) non sono consentite. Altre modifiche nella condizione di join non sono definite correttamente. - Operazione arbitraria con stato: ad esempio,
sdf.groupByKey(...).mapGroupsWithState(...)
osdf.groupByKey(...).flatMapGroupsWithState(...)
. Qualsiasi modifica allo schema dello stato definito dall'utente e il tipo di timeout non è consentito. Qualsiasi modifica all'interno della funzione di mapping dello stato definita dall'utente è consentita, ma l'effetto semantico della modifica dipende dalla logica definita dall'utente. Se si vuole effettivamente supportare le modifiche dello schema di stato, è possibile codificare/decodificare in modo esplicito le strutture di dati di stato complesse in byte usando uno schema di codifica/decodifica che supporta la migrazione dello schema. Ad esempio, se si salva lo stato come byte con codifica Avro, è possibile modificare lo schema di stato Avro tra i riavvii della query perché ripristina lo stato binario.
- Aggregazione di streaming: ad esempio,