Konfigurera RocksDB-tillståndslager i Azure Databricks
Du kan aktivera RocksDB-baserad tillståndshantering genom att ange följande konfiguration i SparkSession innan du startar strömningsfrågan.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Du kan aktivera RocksDB på DLT-pipelines. Se Optimera pipeline-konfiguration för tillståndsbaserad bearbetning.
Aktivera kontrollpunkter för ändringsloggar
I Databricks Runtime 13.3 LTS och senare kan du aktivera kontrollpunkter för ändringsloggar för att minska kontrollpunktens varaktighet och end-to-end-latens för strukturerade strömmande arbetsbelastningar. Databricks rekommenderar att du aktiverar kontrollpunkter för ändringslogg för alla tillståndsfrågor i Structured Streaming.
Traditionellt tar RocksDB State Store ögonblicksbilder och laddar upp datafiler under processen för kontrollpunkting. För att undvika den här kostnaden skriver ändringsloggkontrollpunkter endast poster som har ändrats sedan den senaste kontrollpunkten till varaktig lagring."
Kontrollpunkter för ändringsloggar är inaktiverade som standard. Du kan aktivera kontrollpunkter för ändringsloggar på SparkSession-nivån med hjälp av följande syntax:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Du kan aktivera kontrollpunkter för ändringsloggar i en befintlig dataström och bibehålla tillståndsinformationen som lagras i kontrollpunkten.
Viktigt!
Frågor som har aktiverat kontrollpunkter för ändringsloggar kan bara köras på Databricks Runtime 13.3 LTS och senare. Du kan inaktivera kontrollpunkter för ändringsloggar för att återgå till äldre kontrollpunktsbeteende, men du måste fortsätta att köra dessa frågor på Databricks Runtime 13.3 LTS eller senare. Du måste starta om jobbet för att ändringarna ska kunna genomföras.
Metrik för RocksDB-tillståndslager
Varje operatör för tillståndshantering samlar in metrik relaterade till de tillståndshanteringsåtgärder som utförs på dess RocksDB-instans för att övervaka tillståndslagret och potentiellt hjälpa till med felsökning vid långsam jobbprestanda.
Mått för en specifik instans av tillståndslagret är märkta med deras partitions-ID och lagringsnamn för att säkerställa att de förblir åtskilda. Alla andra mått aggregeras (som summa) per tillståndsoperator i jobbet över alla uppgifter där tillståndsoperatorn körs.
Dessa mått är en del av kartan customMetrics
i fälten stateOperators
i StreamingQueryProgress
. Följande är ett exempel på StreamingQueryProgress
i JSON-format (som erhållits med StreamingQueryProgress.json()
).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
Detaljerade beskrivningar av måtten är följande:
Måttnamn | beskrivning |
---|---|
rocksdbCommitWriteBatchLatency | Tiden (i millis) som togs för att tillämpa stegvis skrivningar i en in-memory-struktur (WriteBatch) till native RocksDB. |
rocksdbCommitFlushLatency | Tid (i millis) för att spola ändringar i RockDB:s minne till lokal disk. |
rocksdbCommitCompactLatency | Tiden (i millisekunder) som krävdes för komprimering (valfritt) under genomförandet av kontrollpunkten. |
rocksdbCommitPauseLatency | Tiden (i millisekunder) det tog att stoppa bakgrundsarbetartrådarna (för kompaktöring med mera) som en del av kontrollpunktsåtgärden. |
rocksdbCommitCheckpointLatency | Tiden (i millis) det tog att ta en ögonblicksbild av det inbyggda RocksDB och skriva den till en lokal katalog. |
rocksdbCommitFileSyncLatencyMs | Tiden (i millisekunder) som det tog att synkronisera de inbyggda RocksDB-snapshotrelaterade filerna till en extern lagringsplats (kontrollpunktsplats). |
rocksdbGetLatency | Genomsnittlig tid (i nanos) som det ursprungliga RocksDB::Get -anropet tog. |
rocksdbPutCount | Genomsnittlig tid (i nanos) som det ursprungliga RocksDB::Put -anropet tog. |
rocksdbGetCount | Antal interna RocksDB::Get anrop (inkluderar Gets inte från WriteBatch – i minnesbatch som används för mellanlagring av skrivningar). |
rocksdbPutCount | Antal inhemska RocksDB::Put anrop (inkluderar inte Puts WriteBatch – en minnesbatch som används för mellanlagring av skrivningar). |
rocksdbTotalBytesReadByGet | Antal okomprimerade byte som läses via interna RocksDB::Get anrop. |
rocksdbTotalBytesWrittenByPut (Totala Bytes Skrivna av Put) | Antal okomprimerade byte som skrivits via interna RocksDB::Put anrop. |
rocksdbReadBlockCacheHitCount | Antal gånger som det interna RocksDB-blockcacheminnet används för att undvika att läsa data från en lokal disk. |
rocksdbReadBlockCacheMissCount | Antal gånger det inbyggda RocksDB-blockcacheminnet missade och krävde läsning av data från den lokala disken. |
rocksdbTotaltBitarLästaVidKompaktering | Antal byte som lästs från den lokala disken av den interna RocksDB-komprimeringsprocessen. |
rocksdbTotalBytesWrittenByCompaction | Antal byte som skrivits till den lokala disken av den interna RocksDB-komprimeringsprocessen. |
rocksdbTotalCompactionLatencyMs | Tiden (i millis) tog för RocksDB-komprimeringar (både bakgrunds- och den valfria komprimering som initierades under commiten). |
rocksdbWriterStallLatencyMs | Tid (i millisekunder) skrivprocessen har avstannat på grund av en bakgrundskomprimering eller tömning av "memtables" till disk. |
rocksdbTotalBytesReadThroughIterator | Vissa tillståndskänsliga åtgärder (till exempel timeoutbearbetning i flatMapGroupsWithState eller vattenstämpling i fönsteraggregeringar) kräver att du läser hela data i DB via iteratorn. Den totala storleken på okomprimerade data som lästs med iteratorn. |