Prüfpunkte für strukturiertes Streaming
Prüfpunkte und Write-Ahead-Protokolle gewähren durch ihre Zusammenarbeit Verarbeitungsgarantien für Workloads für strukturiertes Streaming. Der Prüfpunkt verfolgt die Informationen, die die Abfrage identifizieren, darunter Statusinformationen und verarbeitete Datensätze. Wenn Sie die Dateien in einem Prüfpunktverzeichnis löschen oder zu einem neuen Prüfpunkt-Speicherort wechseln, beginnt die nächste Ausführung der Abfrage von vorne.
Jede Abfrage muss einen anderen Prüfpunkt-Speicherort haben. Mehrere Abfragen sollten sich niemals denselben Speicherort teilen.
Aktivieren erweiterter Prüfpunkte für strukturierte Streaming-Abfragen
Sie müssen die Option checkpointLocation
angeben, bevor Sie eine Streamingabfrage ausführen, wie im folgenden Beispiel veranschaulicht:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Hinweis
Einige Senken, z. B. die Ausgabe für display()
in Notebooks und die Senke memory
, generieren automatisch einen temporären Prüfpunkt-Speicherort, wenn Sie diese Option weglassen. Die temporären Prüfpunkt-Speicherorte bieten keine Garantien für Fehlertoleranz oder Datenkonsistenz und werden unter Umständen nicht ordnungsgemäß bereinigt. Databricks empfiehlt, für diese Senken stets einen Prüfpunkt-Speicherort anzugeben.
Wiederherstellen nach Änderungen in einer Abfrage für strukturiertes Streaming
Es gibt Einschränkungen, welche Änderungen in einer Streamingabfrage zwischen Neustarts vom gleichen Punktspeicherort aus zulässig sind. Nachfolgend sind einige Änderungen angegeben, die entweder nicht zulässig oder deren Auswirkungen nicht klar definiert sind. Für alle gilt Folgendes:
- Der Begriff zulässig bedeutet, dass Sie die angegebene Änderung durchführen können, aber ob die Semantik ihrer Auswirkung klar definiert ist, hängt von der Abfrage und der Änderung ab.
- Der Begriff nicht zulässig bedeutet, dass Sie die angegebene Änderung nicht durchführen sollten, da bei der neu gestarteten Abfrage wahrscheinlich unvorhersehbare Fehler auftreten.
sdf
steht für einen Streamingdatenrahmen oder ein Streamingdataset, der bzw. das mitsparkSession.readStream
generiert wurde.
Arten von Änderungen in strukturierten Streaming-Abfragen
- Änderungen der Anzahl oder des Typs (d. h. eine unterschiedliche Quelle) von Eingabequellen: Dies ist nicht zulässig.
- Änderungen an den Parametern von Eingabequellen: Ob dies zulässig ist und ob die Semantik der Änderung klar definiert ist, hängt von der Quelle und der Abfrage ab. Nachstehend finden Sie einige Beispiele:
Das Hinzufügen, Löschen und Ändern von Ratenbegrenzungen ist zulässig:
spark.readStream.format("kafka").option("subscribe", "article")
zu
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
Änderungen an abonnierten Artikeln und Dateien sind im Allgemeinen nicht zulässig, da die Ergebnisse unvorhersehbar sind:
spark.readStream.format("kafka").option("subscribe", "article")
inspark.readStream.format("kafka").option("subscribe", "newarticle")
- Änderungen im Auslöserintervall: Sie können Trigger zwischen inkrementellen Batches und Zeitintervallen ändern. Siehe Ändern von Auslöserintervallen zwischen Ausführungen.
- Änderungen am Typ der Ausgabesenke: Änderungen zwischen einigen bestimmten Senkenkombinationen sind zulässig. Dies muss von Fall zu Fall geprüft werden. Nachstehend finden Sie einige Beispiele:
- Dateisenke in Kafka-Senke ist zulässig. Für Kafka sind nur die neuen Daten sichtbar.
- Kafka-Senke in Dateisenke ist nicht zulässig.
- Die Änderung einer Kafka-Senke in eine foreach-Senke oder umgekehrt ist zulässig.
- Änderungen an den Parametern der Ausgabesenke: Ob dies zulässig ist und ob die Semantik der Änderung klar definiert ist, hängt von der Senke und der Abfrage ab. Nachstehend finden Sie einige Beispiele:
- Änderungen am Ausgabeverzeichnis einer Dateisenke sind nicht zulässig:
sdf.writeStream.format("parquet").option("path", "/somePath")
insdf.writeStream.format("parquet").option("path", "/anotherPath")
- Änderungen am Ausgabethema sind zulässig:
sdf.writeStream.format("kafka").option("topic", "topic1")
zusdf.writeStream.format("kafka").option("topic", "topic2")
- Änderungen an der benutzerdefinierten foreach-Senke (d. h. dem
ForeachWriter
-Code) sind zulässig, aber die Semantik der Änderung hängt vom Code ab.
- Änderungen am Ausgabeverzeichnis einer Dateisenke sind nicht zulässig:
- Änderungen an Projektions-/Filter-/Zuordnungsvorgängen: Einige Fälle sind zulässig. Beispiel:
- Das Hinzufügen/Löschen von Filtern ist zulässig:
sdf.selectExpr("a")
insdf.where(...).selectExpr("a").filter(...)
. - Änderungen in Projektionen mit gleichem Ausgabeschema sind zulässig:
sdf.selectExpr("stringColumn AS json").writeStream
insdf.select(to_json(...).as("json")).writeStream
. - Änderungen in Projektionen mit unterschiedlichem Ausgabeschema sind bedingt zulässig:
sdf.selectExpr("a").writeStream
insdf.selectExpr("b").writeStream
ist nur zulässig, wenn die Ausgabesenke das Ändern des Schemas von"a"
in"b"
zulässt.
- Das Hinzufügen/Löschen von Filtern ist zulässig:
- Änderungen an zustandsbehafteten Vorgängen: Bei einigen Vorgängen in Streamingabfragen müssen Zustandsdaten beibehalten werden, um das Ergebnis kontinuierlich zu aktualisieren. Beim strukturierten Streaming werden automatisch Prüfpunkte für die Zustandsdaten in einem fehlertoleranten Speicher erstellt (z. B. DBFS, Azure Blob Storage), und diese werden nach einem Neustart wiederhergestellt. Das setzt jedoch voraus, dass das Schema der Zustandsdaten bei allen Neustarts gleich bleibt. Das bedeutet, dass alle Änderungen (d. h. Hinzufügungen, Löschungen oder Schemaänderungen) an den zustandsbehafteten Vorgängen einer Streamingabfrage zwischen Neustarts nicht zulässig sind. Es folgt eine Liste der zustandsbehafteten Vorgänge, deren Schema zwischen Neustarts nicht geändert werden sollte, um die Zustandswiederherstellung zu gewährleisten:
- Streamingaggregation: Zum Beispiel
sdf.groupBy("a").agg(...)
. Änderungen der Anzahl oder des Typs von Gruppierungsschlüsseln oder Aggregaten sind nicht zulässig. - Streamingdeduplizierung: Zum Beispiel
sdf.dropDuplicates("a")
. Änderungen der Anzahl oder des Typs von Gruppierungsschlüsseln oder Aggregaten sind nicht zulässig. - Stream-stream-Join: Zum Beispiel
sdf1.join(sdf2, ...)
(d. h. beide Eingaben werden mitsparkSession.readStream
generiert). Änderungen des Schemas oder der Spalten mit Equi-Join sind nicht zulässig. Änderungen am Jointyp (äußerer oder innerer) sind nicht zulässig. Andere Änderungen in der Joinbedingung sind falsch definiert. - Beliebiger zustandsbehafteter Vorgang: Zum Beispiel
sdf.groupByKey(...).mapGroupsWithState(...)
odersdf.groupByKey(...).flatMapGroupsWithState(...)
. Änderungen am Schema des benutzerdefinierten Zustands und des Timeouttyps sind nicht zulässig. Änderungen innerhalb der Funktion für benutzerdefinierte Zustandszuordnung sind zulässig, aber die semantische Auswirkung der Änderung hängt von der benutzerdefinierten Logik ab. Wenn Sie Änderungen am Zustandsschema wirklich unterstützen möchten, können Sie Ihre komplexen Zustandsdatenstrukturen mithilfe eines Codierungs-/Decodierungsschemas, das die Schemamigration unterstützt, explizit in Bytes codieren/decodieren. Wenn Sie beispielsweise Ihren Status als Avro-codierte Bytes speichern, können Sie das Avro-State-Schema zwischen Abfrageneustarts ändern, da dies den binären Status wiederherstellt.
- Streamingaggregation: Zum Beispiel