Wat is asynchrone voortgangsbewaking?
Belangrijk
Deze functie bevindt zich in openbare preview-versie.
Met asynchrone voortgangstracering kunnen Structured Streaming-pijplijnen de voortgang in een microbatch asynchroon en parallel aan de daadwerkelijke gegevensverwerking bijhouden, waardoor de latentie bij het onderhouden van de offsetLog
en commitLog
wordt verminderd.
Notitie
Asynchrone voortgangstracering werkt niet met Trigger.once
of Trigger.availableNow
triggers. Als u deze functie probeert in te schakelen met deze triggers, resulteert dit in een queryfout.
Hoe werkt het bijhouden van asynchrone voortgang om de latentie te verminderen?
Gestructureerd streamen is afhankelijk van het behouden en beheren van offsets als voortgangsindicatoren voor het verwerken van query's. Offset beheerbewerking heeft rechtstreeks invloed op de verwerkingslatentie, omdat er geen gegevensverwerking kan plaatsvinden totdat deze bewerkingen zijn voltooid. Met asynchrone voortgangstracering kunnen Structured Streaming-pijplijnen hun voortgang bijhouden zonder dat deze wordt beïnvloed door offset beheerbewerkingen.
Wanneer moet u de controlepuntfrequentie configureren?
Gebruikers kunnen de frequentie configureren waarmee de voortgang wordt gecontroleerd. De standaardinstellingen voor controlepuntfrequentie bieden een goede doorvoer voor de meeste query's. Het configureren van de frequentie is handig voor scenario's waarin offset beheerbewerkingen met een hogere snelheid plaatsvinden dan ze kunnen worden verwerkt, waardoor een steeds toenemende achterstand van offset beheerbewerkingen ontstaat. Om deze groeiende achterstand te verminderen, wordt gegevensverwerking geblokkeerd of vertraagd, waardoor het verwerkingsgedrag in wezen wordt teruggedraaid om de voordelen van asynchrone voortgangstracering te elimineren.
Notitie
De hersteltijd voor fouten neemt toe met de toename van de intervaltijd voor controlepunten. In het geval van een fout moet een pijplijn alle gegevens opnieuw verwerken voordat het vorige controlepunt is geslaagd. Gebruikers kunnen rekening houden met deze afweging tussen lagere latentie tijdens regelmatige verwerking en hersteltijd in geval van storing.
Welke configuraties zijn gekoppeld aan asynchrone voortgangstracering?
Optie | Waarde | Verstek | Beschrijving |
---|---|---|---|
asynchrone voortgangscontrole ingeschakeld | waar/onwaar | vals | asynchrone voortgangstracering in- of uitschakelen |
asyncProgressTrackingCheckpointIntervalMs | Milliseconden | 1000 | het interval waarin we offsets en voltooiingscommits doorvoeren |
Hoe kunnen gebruikers asynchrone voortgangstracering inschakelen?
Gebruikers kunnen code gebruiken die vergelijkbaar is met de onderstaande code om deze functie in te schakelen:
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
Asynchrone voortgangstracering uitschakelen
Wanneer het bijhouden van asynchrone voortgang is ingeschakeld, slaat het framework de voortgang niet op voor elke batch. U kunt dit oplossen door voordat u asynchrone voortgangstracering uitschakelt, ten minste twee microbatches te verwerken met de volgende instellingen:
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", 0)
Stop de query nadat ten minste twee microbatches zijn verwerkt. U kunt nu het bijhouden van asynchrone voortgang veilig uitschakelen en de query opnieuw starten.
Als u asynchrone voortgangstracering hebt uitgeschakeld zonder deze stap te voltooien, kan de volgende fout optreden:
java.lang.IllegalStateException: batch x doesn't exist
In de stuurprogrammalogboeken ziet u mogelijk de volgende fout:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Als u de instructies in deze sectie volgt om asynchrone voortgangstracering uit te schakelen, kunt u deze fouten oplossen en uw streamingworkload herstellen.
Beperkingen met asynchrone voortgangstracering
Deze functie heeft de volgende beperkingen:
- Asynchrone voortgangstracering wordt alleen ondersteund in staatloze pijplijnen wanneer u Kafka als sink gebruikt.
- Exacte één keer end-to-end verwerking wordt niet gegarandeerd met asynchrone voortgangstracering, omdat offset bereiken voor batch kunnen worden gewijzigd bij een storing. Sommige sinks, zoals Kafka, bieden nooit precies één keer garanties.