Konfigurera RocksDB-tillståndslager i Azure Databricks
Du kan aktivera RocksDB-baserad tillståndshantering genom att ange följande konfiguration i SparkSession innan du startar strömningsfrågan.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Du kan aktivera RocksDB på Delta Live Tables pipelines. Se Optimize pipeline-konfiguration för tillståndskänslig bearbetning.
Aktivera kontrollpunkter för ändringsloggar
I Databricks Runtime 13.3 LTS och senare kan du aktivera kontrollpunkter för ändringsloggar till lägre varaktighet för kontrollpunkter och svarstid från slutpunkt till slutpunkt för strukturerade strömningsarbetsbelastningar. Databricks rekommenderar att du aktiverar kontrollpunkter för ändringslogg för alla tillståndsfrågor i Structured Streaming.
Traditionellt RocksDB State Store ögonblicksbilder och laddar upp datafiler under kontrollpunkter. För att undvika den här kostnaden skriver ändringsloggkontrollpunkter endast poster som har ändrats sedan den senaste kontrollpunkten till varaktig lagring."
Kontrollpunkter för ändringsloggar är inaktiverade som standard. Du kan aktivera kontrollpunkter för ändringsloggar på SparkSession-nivån med hjälp av följande syntax:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Du kan aktivera kontrollpunkter för ändringsloggar i en befintlig ström och underhålla tillståndsinformationen som lagras i kontrollpunkten.
Viktigt!
Frågor som har aktiverat kontrollpunkter för ändringsloggar kan bara köras på Databricks Runtime 13.3 LTS och senare. Du kan inaktivera kontrollpunkter för ändringsloggar för att återgå till äldre kontrollpunktsbeteende, men du måste fortsätta att köra dessa frågor på Databricks Runtime 13.3 LTS eller senare. Du måste starta om jobbet för att ändringarna ska kunna genomföras.
Mått för RocksDB-tillståndslager
Varje tillståndsoperator samlar in mått relaterade till de tillståndshanteringsåtgärder som utförs på dess RocksDB-instans för att observera tillståndsarkivet och potentiellt hjälpa till med felsökning av jobb långsammare. Dessa mått aggregeras (summa) per tillståndsoperator inom jobbet för alla uppgifter where där tillståndsoperatorn körs. Dessa mått är en del av kartan customMetrics
i fälten stateOperators
i StreamingQueryProgress
. Följande är ett exempel på StreamingQueryProgress
i JSON-formulär (hämtas med ).StreamingQueryProgress.json()
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
Detaljerade beskrivningar av måtten är följande:
Måttnamn | beskrivning |
---|---|
rocksdbCommitWriteBatchLatency | Tiden (i millis) tog för att tillämpa mellanlagrade skrivningar i minnesintern struktur (WriteBatch) på interna RocksDB. |
rocksdbCommitFlushLatency | Tiden (i millis) tog för att rensa RocksDB-minnesinterna ändringar till den lokala disken. |
rocksdbCommitCompactLatency | Tiden (i millis) tog för komprimering (valfritt) under kontrollpunktsincheckningen. |
rocksdbCommitPauseLatency | Tiden (i millis) tog för att stoppa bakgrundsarbetartrådarna (för komprimering osv.) som en del av kontrollpunktsincheckningen. |
rocksdbCommitCheckpointLatency | Tiden (i millis) tog för att ta en ögonblicksbild av den interna RocksDB och skriva den till en lokal katalog. |
rocksdbCommitFileSyncLatencyMs | Det tog tid (i millis) att synkronisera de inbyggda RocksDB-ögonblicksbildsrelaterade filerna till en extern lagringsplats (kontrollpunktsplats). |
rocksdbGetLatency | Genomsnittlig tid (i nanos) tog per det underliggande inbyggda RocksDB::Get anropet. |
rocksdbPutCount | Genomsnittlig tid (i nanos) tog per det underliggande inbyggda RocksDB::Put anropet. |
rocksdbGetCount | Antal interna RocksDB::Get anrop (inkluderar Gets inte från WriteBatch – i minnesbatch som används för mellanlagring av skrivningar). |
rocksdbPutCount | Antal interna RocksDB::Put anrop (inkluderar Puts inte WriteBatch – i minnesbatch som används för mellanlagring av skrivningar). |
rocksdbTotalBytesReadByGet | Antal okomprimerade byte som läses via interna RocksDB::Get anrop. |
rocksdbTotalBytesWrittenByPut | Antal okomprimerade byte som skrivits via interna RocksDB::Put anrop. |
rocksdbReadBlockCacheHitCount | Antal gånger som det interna RocksDB-blockcacheminnet används för att undvika att läsa data från en lokal disk. |
rocksdbReadBlockCacheMissCount | Antal gånger det interna RocksDB-blockcacheminnet missade och krävde läsning av data från den lokala disken. |
rocksdbTotalBytesReadByCompaction | Antal byte som lästs från den lokala disken av den interna RocksDB-komprimeringsprocessen. |
rocksdbTotalBytesWrittenByCompaction | Antal byte som skrivits till den lokala disken av den interna RocksDB-komprimeringsprocessen. |
rocksdbTotalCompactionLatencyMs | Tiden (i millis) tog för RocksDB-komprimering (både bakgrund och den valfria komprimering som initierades under incheckningen). |
rocksdbWriterStallLatencyMs | Tid (i millis) skrivaren har stannat på grund av en bakgrundskomprimering eller tömning av memtables till disk. |
rocksdbTotalBytesReadThroughIterator | Vissa tillståndskänsliga åtgärder (till exempel timeoutbearbetning i flatMapGroupsWithState eller vattenstämpling i fönsteraggregeringar) kräver att du läser hela data i DB via iteratorn. Den totala storleken på okomprimerade data som lästs med iteratorn. |