Delen via


Wat is asynchrone voortgangsbewaking?

Belangrijk

Deze functie bevindt zich in openbare preview-versie.

Met asynchrone voortgangstracering kunnen Structured Streaming-pijplijnen de voortgang in een microbatch asynchroon en parallel aan de daadwerkelijke gegevensverwerking bijhouden, waardoor de latentie bij het onderhouden van de offsetLog en commitLogwordt verminderd.

Asynchrone voortgangsregistratie

Notitie

Asynchrone voortgangstracering werkt niet met Trigger.once of Trigger.availableNow triggers. Als u deze functie probeert in te schakelen met deze triggers, resulteert dit in een queryfout.

Hoe werkt het bijhouden van asynchrone voortgang om de latentie te verminderen?

Gestructureerd streamen is afhankelijk van het behouden en beheren van offsets als voortgangsindicatoren voor het verwerken van query's. Offset beheerbewerking heeft rechtstreeks invloed op de verwerkingslatentie, omdat er geen gegevensverwerking kan plaatsvinden totdat deze bewerkingen zijn voltooid. Met asynchrone voortgangstracering kunnen Structured Streaming-pijplijnen hun voortgang bijhouden zonder dat deze wordt beïnvloed door offset beheerbewerkingen.

Wanneer moet u de controlepuntfrequentie configureren?

Gebruikers kunnen de frequentie configureren waarmee de voortgang wordt gecontroleerd. De standaardinstellingen voor controlepuntfrequentie bieden een goede doorvoer voor de meeste query's. Het configureren van de frequentie is handig voor scenario's waarin offset beheerbewerkingen met een hogere snelheid plaatsvinden dan ze kunnen worden verwerkt, waardoor een steeds toenemende achterstand van offset beheerbewerkingen ontstaat. Om deze groeiende achterstand te verminderen, wordt gegevensverwerking geblokkeerd of vertraagd, waardoor het verwerkingsgedrag in wezen wordt teruggedraaid om de voordelen van asynchrone voortgangstracering te elimineren.

Notitie

De hersteltijd voor fouten neemt toe met de toename van de intervaltijd voor controlepunten. In het geval van een fout moet een pijplijn alle gegevens opnieuw verwerken voordat het vorige controlepunt is geslaagd. Gebruikers kunnen rekening houden met deze afweging tussen lagere latentie tijdens regelmatige verwerking en hersteltijd in geval van storing.

Welke configuraties zijn gekoppeld aan asynchrone voortgangstracering?

Optie Waarde Verstek Beschrijving
asynchrone voortgangscontrole ingeschakeld waar/onwaar vals asynchrone voortgangstracering in- of uitschakelen
asyncProgressTrackingCheckpointIntervalMs Milliseconden 1000 het interval waarin we offsets en voltooiingscommits doorvoeren

Hoe kunnen gebruikers asynchrone voortgangstracering inschakelen?

Gebruikers kunnen code gebruiken die vergelijkbaar is met de onderstaande code om deze functie in te schakelen:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

Asynchrone voortgangstracering uitschakelen

Wanneer het bijhouden van asynchrone voortgang is ingeschakeld, slaat het framework de voortgang niet op voor elke batch. U kunt dit oplossen door voordat u asynchrone voortgangstracering uitschakelt, ten minste twee microbatches te verwerken met de volgende instellingen:

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

Stop de query nadat ten minste twee microbatches zijn verwerkt. U kunt nu het bijhouden van asynchrone voortgang veilig uitschakelen en de query opnieuw starten.

Als u asynchrone voortgangstracering hebt uitgeschakeld zonder deze stap te voltooien, kan de volgende fout optreden:

java.lang.IllegalStateException: batch x doesn't exist

In de stuurprogrammalogboeken ziet u mogelijk de volgende fout:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Als u de instructies in deze sectie volgt om asynchrone voortgangstracering uit te schakelen, kunt u deze fouten oplossen en uw streamingworkload herstellen.

Beperkingen met asynchrone voortgangstracering

Deze functie heeft de volgende beperkingen:

  • Asynchrone voortgangstracering wordt alleen ondersteund in staatloze pijplijnen wanneer u Kafka als sink gebruikt.
  • Exacte één keer end-to-end verwerking wordt niet gegarandeerd met asynchrone voortgangstracering, omdat offset bereiken voor batch kunnen worden gewijzigd bij een storing. Sommige sinks, zoals Kafka, bieden nooit precies één keer garanties.