Asynkron tillståndskontroll för tillståndskänsliga frågor
Anteckning
Finns i Databricks Runtime 10.4 LTS och senare.
Asynkrona tillståndskontrollpunkter upprätthåller exakt-en-gång-garantier för strömningsfrågor men kan minska den totala latenstiden för vissa strukturerade strömmande arbetsbelastningar som blir en flaskhals vid tillståndsuppdateringar. Detta uppnås genom att börja bearbeta nästa mikrobatch så snart beräkningen av den tidigare mikrobatchen har slutförts utan att vänta på att tillståndskontrollerna ska slutföras. I följande tabell jämförs kompromisserna för synkrona och asynkrona kontrollpunkter:
Karakteristisk | Synkrona kontrollpunkter | Asynkron kontrollpunkt |
---|---|---|
Svarstid | Högre svarstid för varje mikrobatch. | Kortare svarstid eftersom mikrobatch kan överlappa varandra. |
Starta om | Snabb återhämtning eftersom endast den sista omgången måste köras igen. | Högre omstartsfördröjning eftersom flera mikrobatcher kan behöva köras om. |
Följande är egenskaper för direktuppspelningsjobb som kan dra nytta av asynkrona kontrollpunkter för tillstånd:
Jobbet har en eller flera tillståndskänsliga operationer (t.ex. aggregering,
flatMapGroupsWithState
,mapGroupsWithState
, ström-ström-sammanfogningar)Fördröjning vid kontrollpunkter för tillstånd är en av de största orsakerna till den övergripande fördröjningen vid batchkörning. Den här informationen finns i StreamingQueryProgress-händelserna . Dessa händelser finns även i log4j-loggar på Spark-drivrutinen. Här är ett exempel på förlopp för strömmande frågor och hur du hittar statuskontrollpunktens inverkan på den övergripande svarstiden för batchkörning.
-
{ "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19", "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe", "...", "batchId" : 0, "durationMs" : { "...", "triggerExecution" : 547730, "..." }, "stateOperators" : [ { "...", "commitTimeMs" : 3186626, "numShufflePartitions" : 64, "..." }] }
Svarstidsanalys för tillståndskontroll för ovanstående frågeförloppshändelse
- Batchvaraktighet (
durationMs.triggerDuration
) är cirka 547 sekunder. - Svarstiden för att slutföra tillståndslager (
stateOperations[0].commitTimeMs
) är cirka 3 186 sekunder. Svarstiden för incheckning aggregeras mellan aktiviteter som innehåller ett tillståndslager. I det här fallet finns det 64 sådana uppgifter (stateOperators[0].numShufflePartitions
). - Varje uppgift som innehåller tillståndsoperatorn tog i genomsnitt 50 sek (3 186/64) för kontrollpunkt. Det här är en extra fördröjning som bidrar till batchvaraktigheten. Förutsatt att alla 64 aktiviteter körs samtidigt bidrog kontrollpunktssteget med cirka 9 % (50 sek/547 sek) av batchvaraktigheten. Procentandelen blir ännu högre när de maximala samtidiga uppgifterna är mindre än 64.
- Batchvaraktighet (
-
Aktivera asynkrona kontrollpunkter för tillstånd
Du måste använda det RocksDB-baserade tillståndsarkivet för asynkrona kontrollpunkter för tillstånd. Ange följande konfigurationer:
spark.conf.set(
"spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
"true"
)
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)
Begränsningar och krav för asynkron kontrollpunkt
Kommentar
Automatisk skalning av beräkning har begränsningar för att skala ned klusterstorleken för arbetsbelastningar med strukturerad direktuppspelning. Databricks rekommenderar att du använder DLT med förbättrad automatisk skalning för strömningsarbetsbelastningar. Se Optimera klusteranvändningen av DLT-pipelines med förbättrad automatisk skalning.
- Eventuella fel i en asynkron kontrollpunkt i ett eller flera butiker misslyckas med frågan. I synkront kontrollpunktsläge körs kontrollpunkten som en del av uppgiften och Spark försöker utföra uppgiften flera gånger innan frågan misslyckas. Den här mekanismen finns inte med asynkron tillståndskontroll. Databricks rekommenderar att du använder kontinuerliga jobb för automatiska återförsök vid jobbfel. Se även Köra jobb kontinuerligt.
- Asynkron kontrollpunktsprocess fungerar bäst när lagringsplatserna för tillståndsdata inte ändras mellan mikrobatchkörningar. Storleksändring av kluster i kombination med asynkrona kontrollpunkter för tillstånd kanske inte fungerar bra eftersom tillståndslagerinstansen kan distribueras om när noder läggs till eller tas bort som en del av klusterändringshändelsen.
- Asynkrona kontrollpunkter för tillstånd stöds endast i implementeringen av RocksDB-tillståndslagerprovidern. Standardimplementeringen för minnesintern tillståndslagring har inte stöd för det.