RocksDB-statusopslag configureren in Azure Databricks
U kunt statusbeheer op basis van RocksDB inschakelen door de volgende configuratie in de SparkSession in te stellen voordat u de streamingquery start.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
U kunt RocksDB inschakelen in Delta Live Tables-pijplijnen. Zie Pijplijnconfiguratie optimaliseren voor stateful verwerking.
Controlepunten voor wijzigingenlogboek inschakelen
In Databricks Runtime 13.3 LTS en hoger kunt u changelog-controlepunten inschakelen om de duur van het controlepunt en de end-to-end-latentie voor structured streaming-workloads te verlagen. Databricks raadt aan controlepunten in het wijzigingenlogboek in te schakelen voor alle stateful query's voor gestructureerd streamen.
Traditionele RocksDB State Store-momentopnamen en uploadt gegevensbestanden tijdens controlepunten. Om deze kosten te voorkomen, schrijft changelog-controlepunten alleen records die zijn gewijzigd sinds het laatste controlepunt naar duurzame opslag.
Changelog-controlepunten zijn standaard uitgeschakeld. U kunt controlepunten voor wijzigingenlogboeken inschakelen in het SparkSession-niveau met behulp van de volgende syntaxis:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
U kunt controlepunten voor wijzigingenlogboeken inschakelen voor een bestaande stroom en de statusgegevens onderhouden die zijn opgeslagen in het controlepunt.
Belangrijk
Query's waarvoor controlepunten voor wijzigingenlogboeken zijn ingeschakeld, kunnen alleen worden uitgevoerd op Databricks Runtime 13.3 LTS en hoger. U kunt changelog-controlepunten uitschakelen om terug te keren naar het verouderde controlepuntgedrag, maar u moet deze query's blijven uitvoeren op Databricks Runtime 13.3 LTS of hoger. U moet de taak opnieuw starten om deze wijzigingen te kunnen uitvoeren.
Metrische gegevens van de rocksDB-statusopslag
Elke statusoperator verzamelt metrische gegevens met betrekking tot de statusbeheerbewerkingen die zijn uitgevoerd op het RocksDB-exemplaar om het statusarchief te observeren en mogelijk te helpen bij het opsporen van fouten in de taak traagheid. Deze metrische gegevens worden geaggregeerd (som) per statusoperator in de taak voor alle taken waarin de statusoperator wordt uitgevoerd. Deze metrische gegevens maken deel uit van de customMetrics
kaart in de stateOperators
velden in StreamingQueryProgress
. Hier volgt een voorbeeld van in JSON-formulier (verkregen met behulp van StreamingQueryProgress
StreamingQueryProgress.json()
).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
Gedetailleerde beschrijvingen van de metrische gegevens zijn als volgt:
Naam van meetwaarde | Beschrijving |
---|---|
rocksdbCommitWriteBatchLatency | De tijd (in millis) duurde voor het toepassen van de gefaseerde schrijfbewerkingen in de geheugenstructuur (WriteBatch) op systeemeigen RocksDB. |
rocksdbCommitFlushLatency | Tijd (in millis) voor het leegmaken van de RocksDB-in-memory wijzigingen in de lokale schijf. |
rocksdbCommitCompactLatency | Tijd (in millis) duurde voor compressie (optioneel) tijdens het doorvoeren van het controlepunt. |
rocksdbCommitPauseLatency | Tijd (in millis) voor het stoppen van de achtergrondwerkrolthreads (voor compressie, enzovoort) als onderdeel van de controlepuntdoorvoering. |
rocksdbCommitCheckpointLatency | Tijd (in millis) voor het maken van een momentopname van systeemeigen RocksDB en schrijven naar een lokale map. |
rocksdbCommitFileSyncLatencyMs | Tijd (in millis) voor het synchroniseren van de systeemeigen RocksDB-momentopname gerelateerde bestanden aan een externe opslag (controlepuntlocatie). |
rocksdbGetLatency | De gemiddelde tijd (in nanos) duurde per onderliggende systeemeigen RocksDB::Get aanroep. |
rocksdbPutCount | De gemiddelde tijd (in nanos) duurde per onderliggende systeemeigen RocksDB::Put aanroep. |
rocksdbGetCount | Aantal systeemeigen RocksDB::Get aanroepen (omvat Gets niet van WriteBatch- in de geheugenbatch die wordt gebruikt voor faseringsschrijfbewerkingen). |
rocksdbPutCount | Aantal systeemeigen RocksDB::Put aanroepen (omvat Puts niet naar WriteBatch- in de geheugenbatch die wordt gebruikt voor faseringsschrijfbewerkingen). |
rocksdbTotalBytesReadByGet | Aantal niet-gecomprimeerde bytes dat door systeemeigen RocksDB::Get aanroepen wordt gelezen. |
rocksdbTotalBytesWrittenByPut | Aantal niet-gecomprimeerde bytes geschreven via systeemeigen RocksDB::Put aanroepen. |
rocksdbReadBlockCacheHitCount | Aantal keren dat de systeemeigen RocksDB-blokcache wordt gebruikt om te voorkomen dat gegevens van lokale schijf worden gelezen. |
rocksdbReadBlockCacheMissCount | Aantal keren dat de systeemeigen RocksDB cache gemist en vereiste leesgegevens van de lokale schijf. |
rocksdbTotalBytesReadByCompaction | Het aantal bytes dat van de lokale schijf is gelezen door het systeemeigen RocksDB-compressieproces. |
rocksdbTotalBytesWrittenByCompaction | Het aantal bytes dat naar de lokale schijf is geschreven door het systeemeigen RocksDB-compressieproces. |
rocksdbTotalCompactionLatencyMs | Tijd (in millis) nam voor RocksDB-compressies (zowel achtergrond als de optionele compressie geïnitieerd tijdens de doorvoer). |
rocksdbWriterStallLatencyMs | Tijd (in millis) is de schrijver vastgelopen vanwege een achtergrondcompressie of het leegmaken van de memtables op schijf. |
rocksdbTotalBytesReadThroughIterator | Voor sommige stateful bewerkingen (zoals time-outverwerking in flatMapGroupsWithState of watermerken in gevensterde aggregaties) moeten volledige gegevens in DB worden gelezen via iterator. De totale grootte van niet-gecomprimeerde gegevens die worden gelezen met behulp van de iterator. |