Delen via


Select een uitvoermodus voor gestructureerd streamen

In dit artikel wordt beschreven hoe u een uitvoermodus selecteert voor stateful streaming. Alleen stateful streams met aggregaties vereisen een configuratie van de uitvoermodus.

Joins ondersteunen alleen de toevoeguitvoermodus en de uitvoermodus heeft geen invloed op ontdubbeling. De willekeurige stateful operators mapGroupsWithState en flatMapGroupsWithState verzenden records met behulp van hun eigen aangepaste logica, zodat de uitvoermodus van de stream geen invloed heeft op hun gedrag.

Voor stateless streaming gedragen alle uitvoermodi zich hetzelfde.

Als u de uitvoermodus correct wilt configureren, moet u stateful streaming, watermerken en triggers begrijpen. Zie de volgende artikelen:

Wat is de uitvoermodus?

De uitvoermodus van een Structured Streaming-query bepaalt welke records de operators van de query tijdens elke trigger verzenden. De drie typen records die kunnen worden verzonden, zijn:

  • Registreert dat toekomstige verwerking niet verandert.
  • De records die zijn gewijzigd sinds de laatste trigger.
  • Alle records in de status table.

Weten welke typen records moeten worden verzonden, is belangrijk voor stateful operators, omdat een bepaalde rij die door een stateful operator wordt geproduceerd, kan veranderen van trigger naar trigger. Als een operator voor streamingaggregatie bijvoorbeeld meer rijen ontvangt voor een bepaalde window, kan de aggregatie van windowvalues veranderen tussen triggers.

Voor stateless operators heeft het onderscheid tussen recordtypen geen invloed op het gedrag van de operator. De records die een staatloze operator verzendt tijdens een trigger, zijn altijd de bronrecords die tijdens die trigger worden verwerkt.

Beschikbare uitvoermodi

Er zijn drie uitvoermodi waarmee een operator aangeeft welke records moeten worden verzonden tijdens een bepaalde trigger:

Uitvoermodus Beschrijving
Toevoegmodus (standaard) Streamingquery's worden standaard uitgevoerd in de toevoegmodus. In deze modus verzenden operators alleen rijen die niet veranderen in toekomstige triggers. Stateful operators gebruiken de watermark om te bepalen wanneer dit gebeurt.
Update modus In update modus verzenden operators alle rijen die tijdens de trigger zijn gewijzigd, zelfs als de verzonden record in een volgende trigger kan veranderen.
Volledige modus De volledige modus werkt alleen met streamingaggregaties. In de volledige modus worden alle resulterende rijen die door de operator zijn geproduceerd, downstream verzonden.

Overwegingen voor productie

Voor veel stateful streaming-operations moet u kiezen tussen toevoeg- en update modi. In de volgende secties worden overwegingen beschreven waarmee u uw beslissing kunt bepalen.

Notitie

De volledige modus heeft sommige toepassingen, maar kan slecht presteren wanneer gegevens worden geschaald. Databricks raadt aan om materialized views te gebruiken voor de semantische garanties die get zijn gekoppeld aan de volledige modus met incrementele verwerking voor veel stateful bewerkingen. Zie Gerealiseerde views gebruiken in Databricks SQL.

Semantiek van toepassingen

Toepassingssemantiek beschrijft hoe downstreamtoepassingen gebruikmaken van de streaminggegevens.

Als downstreamservices één actie moeten ondernemen voor elke downstream-schrijfbewerking, gebruikt u in de meeste gevallen de toevoegmodus. Als u bijvoorbeeld een downstreammeldingsservice hebt die meldingen verzendt voor elke nieuwe record die naar de sink is geschreven, zorgt de toevoegmodus ervoor dat elke record slechts eenmaal wordt geschreven. Update modus schrijft de record telkens wanneer de statusinformatie verandert, wat zou leiden tot talloze updates.

Als downstreamservices nieuwe resultaten nodig hebben, zorgt update modus ervoor dat uw sink zo up-to-date mogelijk blijft. Voorbeelden zijn een machine learning-model dat functies in realtime leest of een analysedashboard waarmee realtime statistische gegevens worden bijgehouden.

Compatibiliteit van operatoren en sinks

Structured Streaming biedt geen ondersteuning voor alle bewerkingen die beschikbaar zijn in Apache Spark en sommige streamingbewerkingen worden niet ondersteund in alle uitvoermodi. Zie de OSS-streamingdocumenten voor meer informatie over operatorbeperkingen.

Niet alle sinks ondersteunen alle uitvoermodi. Zowel Delta Lake, dat alle Unity-Catalog-beheerde tablesondersteunt, als Kafka ondersteunen alle uitvoermodi. Zie de OSS-streamingdocumenten voor meer informatie over sinkcompatibiliteit.

Latentie en kosten

De uitvoermodus heeft invloed op de hoeveelheid tijd die moet worden verstreken voordat u een record schrijft. De frequentie en hoeveelheid geschreven gegevens kunnen van invloed zijn op de kosten die zijn gekoppeld aan streamingpijplijnen.

In de toevoegmodus forceren stateful operatoren om resultaten alleen uit te zenden nadat stateful resultaten zijn voltooid, wat ten minste zo lang is als uw watermark vertraging. Een watermark vertraging van 1 hour in de bijvoeg-uitvoermodus betekent dat uw records minstens een uur vertraging hebben voordat ze downstream worden verzonden.

Update modus resulteert in één schrijfbewerking per trigger per geaggregeerde waarde. Als uw sinkkosten per schrijf per record in rekening worden gebracht, kan dit duur zijn als records vaak update voordat de watermark vertraging is verstreken.

Configuratievoorbeelden

In de volgende codevoorbeelden ziet u hoe u de uitvoermodus configureert voor streaming-updates voor Unity Catalogtables:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Zie OSS-documenten voor PySpark DataStreamWriter.outputMode of Scala DataStreamWriter.outputMode.

Voorbeeld van stateful streaming- en uitvoermodi

Het volgende voorbeeld is bedoeld om u te helpen redeneren door de manier waarop de uitvoermodus communiceert met watermerken voor stateful streaming.

Overweeg een streamingaggregatie die de totale omzet berekent die elk uur in een winkel wordt gegenereerd met een watermark vertraging van 15 minuten. De eerste microbatch verwerkt de volgende records:

  • $ 15 om 2:40 uur
  • $ 10 om 2:30 uur
  • $ 30 om 13:10 uur

Op dit moment is de watermark van de motor 15:55 uur, omdat deze 15 minuten (de vertraging) aftrekken van de maximale tijd (13:10 uur). De operator voor streamingaggregatie heeft de volgende status:

  • [2pm, 3pm]: $ 25
  • [3pm, 4pm]: $ 30

In de volgende table wordt beschreven wat er in elke uitvoermodus zou gebeuren:

Uitvoermodus Resultaat en reden
Toevoegen De operator voor streamingaggregatie verzendt niets downstream. Dit komt doordat beide vensters kunnen veranderen als er nieuwe values verschijnen na een volgende trigger: de watermark van 2:55 uur geeft aan dat records na 2:55 uur nog steeds kunnen aankomen, waarna deze records kunnen vallen in de [2pm, 3pm]window of de [3pm, 4pm]window.
Update De operator verzendt beide records, omdat beide records updates hebben ontvangen.
Voltooid De operator verzendt alle records.

Stel nu dat de stream nog een record ontvangt:

  • $ 20 om 13:20 uur

De watermark wordt bijgewerkt naar 15:05 uur omdat de engine 15 minuten van 15:20 uur aftrekt. Op dit moment heeft de operator voor streamingaggregatie het volgende in de status:

  • [2pm, 3pm]: $ 25
  • [3pm, 4pm]: $ 50

In de volgende table wordt beschreven wat er in elke uitvoermodus zou gebeuren:

Uitvoermodus Resultaat en reden
Toevoegen De operator voor streamingaggregatie merkt op dat de watermark om 15:05 uur groter is dan het einde van de [2pm, 3pm]window. Door de definitie van de watermarkkan die window niet meer veranderen, dus zendt het de [2pm, 3pm]windowuit.
Update De operator voor streamingaggregatie verzendt de [3pm, 4pm]window omdat de statuswaarde is gewijzigd van $ 30 naar $ 50.
Voltooid De operator verzendt alle records.

Hieronder ziet u hoe stateful operators zich gedragen in elke toevoegmodus:

  • In de toevoegmodus, schrijf eenmaal records na de watermark-vertraging.
  • Schrijf in update-modus de records die zijn gewijzigd vanaf de vorige trigger.
  • Schrijf in de volledige modus alle records die zijn geproduceerd door de stateful operator.