Konfigurace úložiště stavů RocksDB v Azure Databricks
Správu stavu na základě RocksDB můžete povolit nastavením následující konfigurace v SparkSession před spuštěním dotazu streamování.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Můžete povolit RocksDB na pipelinech Delta Live Tables. Prohlédněte si konfiguraci potrubí Optimize pro stavové zpracování.
Povolení vytváření kontrolních bodů protokolu změn
Ve službě Databricks Runtime 13.3 LTS a novějších můžete povolit kontrolní body protokolu změn, aby se snížila doba trvání kontrolního bodu a celková latence pro úlohy strukturovaného streamování. Databricks doporučuje zapnout kontrolní body protokolu změn pro všechny stavové dotazy strukturovaného streamování.
Tradičně rocksDB State Store snímky a nahrává datové soubory během vytváření kontrolních bodů. Pokud se chcete těmto nákladům vyhnout, kontrolní body protokolu změn zapisují pouze záznamy, které se změnily od posledního kontrolního bodu do odolného úložiště.
Vytváření kontrolních bodů protokolu změn je ve výchozím nastavení zakázané. Kontrolní body protokolu změn můžete povolit na úrovni SparkSession pomocí následující syntaxe:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
U existujícího datového proudu můžete povolit kontrolní body protokolu změn a zachovat informace o stavu uložené v kontrolním bodu.
Důležité
Dotazy, které povolily vytváření kontrolních bodů protokolu změn, je možné spouštět pouze v Databricks Runtime 13.3 LTS a vyšší. Pokud se chcete vrátit ke staršímu chování kontrolních bodů, můžete zakázat vytváření kontrolních bodů protokolu změn, ale tyto dotazy musíte dál spouštět ve službě Databricks Runtime 13.3 LTS nebo vyšší. Aby se tyto změny projevily, musíte úlohu restartovat.
Metriky úložiště stavu RocksDB
Každý operátor stavu shromažďuje metriky související s operacemi správy stavu provedenými ve své instanci RocksDB, aby bylo možné sledovat úložiště stavů a potenciálně pomoct s zpomalením úlohy ladění. Tyto metriky se sčítají pro každý operátor stavu v úloze napříč všemi úkoly, kde where je operátor stavu spuštěný. Tyto metriky jsou součástí customMetrics
mapy uvnitř stateOperators
polí v StreamingQueryProgress
. Následuje příklad StreamingQueryProgress
ve formátu JSON (získaném pomocí StreamingQueryProgress.json()
).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
Podrobné popisy metrik jsou následující:
Název metriky | Popis |
---|---|
rocksdbCommitWriteBatchLatency | Doba (v milis) trvala použití fázovaných zápisů ve struktuře v paměti (WriteBatch) na nativní RocksDB. |
rocksdbCommitFlushLatency | Čas (v milis) trvalo vyprázdnění změn v paměti RocksDB na místní disk. |
rocksdbCommitCompactLatency | Čas (v milis) trvalo komprimace (volitelné) během potvrzení kontrolního bodu. |
rocksdbCommitPauseLatency | Čas (v milis) trvalo zastavení pracovních vláken na pozadí (pro komprimace atd.) v rámci potvrzení kontrolního bodu. |
rocksdbCommitCheckpointLatency | Čas (v milis) trvalo pořízení snímku nativní rocksDB a jeho zápis do místního adresáře. |
rocksdbCommitFileSyncLatencyMs | Čas (v milis) trvalo synchronizaci nativních souborů souvisejících se snímky RocksDB s externím úložištěm (umístění kontrolního bodu). |
rocksdbGetLatency | Průměrná doba (v nanos) trvala podle základního nativního RocksDB::Get volání. |
rocksdbPutCount | Průměrná doba (v nanos) trvala podle základního nativního RocksDB::Put volání. |
rocksdbGetCount | Počet nativních RocksDB::Get volání (nezahrnuje Gets WriteBatch – v dávce paměti používané pro přípravné zápisy). |
rocksdbPutCount | Počet nativních RocksDB::Put volání (nezahrnuje Puts WriteBatch – v dávce paměti používané pro přípravné zápisy). |
rocksdbTotalBytesReadByGet | Počet nekomprimovaných bajtů přečtených prostřednictvím nativních RocksDB::Get volání. |
rocksdbTotalBytesWrittenByPut | Počet nekomprimovaných bajtů zapsaných prostřednictvím nativních RocksDB::Put volání |
rocksdbReadBlockCacheHitCount | Počet použití nativní mezipaměti bloků RocksDB k tomu, aby se zabránilo čtení dat z místního disku. |
rocksdbReadBlockCacheMissCount | Kolikrát nativní blok RocksDB zmeškala mezipaměť a vyžadovala čtení dat z místního disku. |
rocksdbTotalBytesReadByCompaction | Počet bajtů načtených z místního disku nativním procesem komprimace RocksDB |
rocksdbTotalBytesWrittenByCompaction | Počet bajtů zapsaných na místní disk nativním procesem komprimace RocksDB |
rocksdbTotalCompactionLatencyMs | Čas (v milis) trvalo komprimace RocksDB (pozadí i volitelné komprimace zahájené během potvrzení). |
rocksdbWriterStallLatencyMs | Čas (v milis) zapisovač se zastavil kvůli komprimování pozadí nebo vyprázdnění memtables na disk. |
rocksdbTotalBytesReadThroughIterator | Některé stavové operace (například zpracování časového limitu v flatMapGroupsWithState agregaci s okny nebo vodoznaky) vyžadují čtení celých dat v databázi prostřednictvím iterátoru. Celková velikost nekomprimovaných dat přečtených pomocí iterátoru. |