Freigeben über


Auswählen eines Ausgabemodus für strukturiertes Streaming

Dieser Artikel enthält Informationen zur Wahl eines Ausgabemodus für zustandsbehaftetes Streaming. Ein Ausgabemodus muss nur für zustandsbehaftete Streams mit Aggregationen konfiguriert werden.

Joins unterstützen nur den Anfügeausgabemodus, und der Ausgabemodus wirkt sich nicht auf die Deduplizierung aus. Die willkürlichen zustandsbehafteten Operatoren mapGroupsWithState und flatMapGroupsWithState geben Datensätze mithilfe einer eigenen benutzerdefinierten Logik aus, sodass sich der Ausgabemodus des Streams nicht auf ihr Verhalten auswirkt.

Beim zustandslosen Streaming verhalten sich alle Ausgabemodi gleich.

Um den Ausgabemodus korrekt konfigurieren zu können, müssen Sie mit zustandsbehaftetem Streaming sowie mit Wasserzeichen und Triggern vertraut sein. Weitere Informationen finden Sie in folgenden Artikeln:

Was ist der Ausgabemodus?

Der Ausgabemodus einer Abfrage für strukturiertes Streaming bestimmt, welche Datensätze die Abfrageoperatoren beim jeweiligen Trigger ausgeben. Drei Arten von Datensätzen können ausgegeben werden:

  • Datensätze, die bei der weiteren Verarbeitung nicht geändert werden.
  • Datensätze, die seit dem letzten Trigger geändert wurden.
  • Alle Datensätze in der Zustandstabelle.

Für zustandsbehaftete Operatoren ist es wichtig zu wissen, welche Arten von Datensätzen ausgegeben werden sollen, da sich eine bestimmte Zeile, die von einem zustandsbehafteten Operator erzeugt wird, von Trigger zu Trigger ändern kann. Wenn also beispielsweise ein Streamingaggregationsoperator weitere Zeilen für ein bestimmtes Fenster empfängt, ändern sich ggf. die Aggregationswerte dieses Fensters triggerübergreifend.

Bei zustandslosen Operatoren wirken sich unterschiedliche Datensatztypen nicht auf das Verhalten des Operators aus. Bei den Datensätzen, die ein zustandsloser Operator im Rahmen eines Triggers ausgibt, handelt es sich immer um die Quelldatensätze, die im Rahmen dieses Triggers verarbeitet werden.

Verfügbare Ausgabemodi

Es gibt drei Ausgabemodi, die einem Operator mitteilen, welche Datensätze im Rahmen eines bestimmten Triggers ausgegeben werden sollen:

Ausgabemodus Beschreibung
Anfügemodus (Standardeinstellung) Standardmäßig werden Streamingabfragen im Anfügemodus ausgeführt. In diesem Modus geben Operatoren nur Zeilen aus, die sich bei zukünftigen Triggern nicht ändern. Zustandsbehaftete Operatoren verwenden das Wasserzeichen, um zu bestimmen, wann dies der Fall ist.
Aktualisierungsmodus Im Aktualisierungsmodus geben Operatoren alle Zeilen aus, die sich im Rahmen des Triggers geändert haben, auch wenn sich der ausgegebene Datensatz womöglich bei einem späteren Trigger ändert.
Vollständiger Modus Der vollständige Modus funktioniert nur mit Streamingaggregationen. Im vollständigen Modus werden alle vom Operator erzeugten resultierenden Zeilen nachgelagert ausgegeben.

Produktionsüberlegungen

Bei vielen zustandsbehafteten Streamingvorgängen müssen Sie zwischen Anfüge- und Aktualisierungsmodus wählen. Die folgenden Abschnitte enthalten Überlegungen, an denen Sie sich bei Ihrer Entscheidung orientieren können.

Hinweis

Der vollständige Modus ist für bestimmte Fälle geeignet, schneidet aber unter Umständen nicht so gut ab, wenn die Daten skaliert werden. Databricks empfiehlt die Verwendung materialisierter Sichten, um semantische Garantien für den vollständigen Modus mit inkrementeller Verarbeitung für viele zustandsbehaftete Vorgänge zu erhalten. Weitere Informationen finden Sie unter Verwenden materialisierter Sichten in Databricks SQL.

Anwendungssemantik

Die Anwendungssemantik beschreibt, wie nachgelagerte Anwendungen die Streamingdaten verwenden.

Wenn nachgelagerte Dienste für jeden nachgelagerten Schreibvorgang eine einzelne Aktion ausführen müssen, empfiehlt sich in den meisten Fällen die Verwendung des Anfügemodus. Wenn Sie also beispielsweise über einen nachgelagerten Benachrichtigungsdienst verfügen, der Benachrichtigungen für jeden neuen Datensatz sendet, der in die Senke geschrieben wird, sorgt der Anfügemodus dafür, dass jeder Datensatz nur einmal geschrieben wird. Der Aktualisierungsmodus schreibt den Datensatz jedes Mal, wenn sich die Zustandsinformationen ändern, was zu zahlreichen Aktualisierungen führen würde.

Wenn nachgelagerte Dienste aktuelle Ergebnisse benötigen, sorgt der Aktualisierungsmodus dafür, dass Ihre Senke so aktuell wie möglich bleibt. Beispiele wären etwa ein Machine Learning-Modell, das Features in Echtzeit liest, oder ein Analysedashboard, das Echtzeitaggregate nachverfolgt.

Operator- und Senkenkompatibilität

Strukturiertes Streaming unterstützt nicht alle Vorgänge, die in Apache Spark verfügbar sind, und einige Streamingvorgänge werden nicht in allen Ausgabemodi unterstützt. Weitere Informationen zu Operatorbeschränkungen finden Sie in der OSS-Streamingdokumentation.

Nicht alle Ausgabemodi werden von allen Senken unterstützt. Sowohl Delta Lake (unterstützt alle verwalteten Unity Catalog-Tabellen) als auch Kafka unterstützen alle Ausgabemodi. Weitere Informationen zur Senkenkompatibilität finden Sie in der OSS-Streamingdokumentation.

Wartezeit und Kosten

Der Ausgabemodus wirkt sich darauf aus, wie viel Zeit bis zum Schreiben eines Datensatzes vergehen muss, und die Frequenz und Menge der geschriebenen Daten kann sich auf die Kosten im Zusammenhang mit Streamingpipelines auswirken.

Der Anfügemodus erzwingt, dass zustandsbehaftete Operatoren Ergebnisse erst ausgeben, wenn zustandsbehaftete Ergebnisse fertig gestellt wurden, was mindestens so lange dauert wie Ihre Wasserzeichenverzögerung. Bei einer Wasserzeichenverzögerung von 1 hour im Anfügeausgabemodus bedeutet, dass Ihre Datensätze mindestens eine Verzögerung von einer Stunde haben, bevor sie nachgelagert ausgegeben werden.

Der Aktualisierungsmodus resultiert in einem einzelnen Schreibvorgang pro Trigger und Aggregatwert. Falls bei Ihrer Senke Gebühren pro Schreibvorgang und Datensatz anfallen, ist es unter Umständen teuer, wenn Datensätze mehrmals aktualisiert werden, bis das Ende der Wasserzeichenverzögerung erreicht ist.

Konfigurationsbeispiele

Die folgenden Codebeispiele zeigen das Konfigurieren des Ausgabemodus für das Streaming von Aktualisierungen an Unity Catalog-Tabellen:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Weitere Informationen finden Sie in der OSS-Dokumentation unter PySpark DataStreamWriter.outputMode oder Scala DataStreamWriter.outputMode.

Beispiel für zustandsbehaftetes Streaming und Ausgabemodi

Das folgende Beispiel veranschaulicht die Interaktion des Ausgabemodus mit Wasserzeichen für zustandsbehaftetes Streaming.

Stellen Sie sich eine Streamingaggregation vor, die den Gesamtumsatz berechnet, der pro Stunde in einem Geschäft erzielt wurde, und dabei eine Wasserzeichenverzögerung von 15 Minuten verwendet. Im Rahmen des ersten Mikrobatch werden folgende Datensätze verarbeitet:

  • 15 USD um 14:40 Uhr
  • 10 USD um 14:30 Uhr
  • 30 USD um 15:10 Uhr

An diesem Punkt liegt das Wasserzeichen der Engine bei 14:55 Uhr, da 15 Minuten (die Verzögerung) vom höchsten vorhandenen Zeitwert (15:10 Uhr) abgezogen werden. Der Zustand des Streamingaggregationsoperators enthält Folgendes:

  • [2pm, 3pm]: 25 USD
  • [3pm, 4pm]: 30 USD

Die folgende Tabelle zeigt, was im jeweiligen Ausgabemodus passieren würde:

Ausgabemodus Ergebnis und Grund
Anfügen Der Streamingaggregationsoperator gibt nichts an nachgelagerte Komponenten aus. Das liegt daran, dass sich beide Fenster noch ändern können, wenn neue Werte mit einem nachfolgenden Trigger hinzukommen: Das Wasserzeichen von 14:55 Uhr bedeutet, dass möglicherweise noch Datensätze nach 14:55 Uhr eingehen, und diese Datensätze können entweder in das [2pm, 3pm]-Fenster oder in das [3pm, 4pm]-Fenster fallen.
Aktualisieren Der Operator gibt beide Datensätze aus, da beide Datensätze Aktualisierungen erhalten haben.
Abschließen Der Operator gibt alle Datensätze aus.

Angenommen, der Datenstrom empfängt noch einen weiteren Datensatz:

  • 20 USD um 15:20 Uhr

Das Wasserzeichen wird auf 15:05 Uhr aktualisiert, da die Engine 15 Minuten von 15:20 Uhr abzieht. Nun enthält der Zustand des Streamingaggregationsoperators Folgendes:

  • [2pm, 3pm]: 25 USD
  • [3pm, 4pm]: 50 USD

Die folgende Tabelle zeigt, was im jeweiligen Ausgabemodus passieren würde:

Ausgabemodus Ergebnis und Grund
Anfügen Der Streamingaggregationsoperator erkennt, dass das Wasserzeichen von 15:05 Uhr nach dem Ende des [2pm, 3pm]-Fensters liegt. Aufgrund der Definition des Wasserzeichens kann sich dieses Fenster nicht mehr ändern. Daher wird das [2pm, 3pm]-Fenster ausgegeben.
Aktualisieren Der Streamingaggregationsoperator gibt das [3pm, 4pm]-Fenster aus, da sich der Zustandswert von 30 USD in 50 USD geändert hat.
Abschließen Der Operator gibt alle Datensätze aus.

Die folgende Zusammenfassung veranschaulicht, wie sich zustandsbehaftete Operatoren im jeweiligen Anfügemodus verhalten:

  • Im Anfügemodus werden Datensätze nach der Wasserzeichenverzögerung einmal geschrieben.
  • Im Aktualisierungsmodus werden Datensätze, die sich seit dem vorherigen Auslöser geändert haben, geschrieben.
  • Im vollständigen Modus werden alle Datensätze geschrieben, die vom zustandsbehafteten Operator erstellt wurden.