Настройка хранилища состояний RocksDB в Azure Databricks
Вы можете включить управление состоянием на основе RocksDB, задав следующую конфигурацию в SparkSession перед началом потокового запроса.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Вы можете включить RocksDB в конвейерах Delta Live Tables. См. раздел "Оптимизация конфигурации конвейера" для обработки с отслеживанием состояния.
Включение контрольных точек журнала изменений
В Databricks Runtime 13.3 LTS и более поздних версиях можно включить контрольную точку журнала изменений, чтобы снизить длительность контрольной точки и сквозную задержку для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует включать контрольные точки журнала изменений для всех запросов структурированных потоков с отслеживанием состояния.
Традиционно моментальные снимки хранилища состояний RocksDB и передают файлы данных во время контрольной точки. Чтобы избежать этой стоимости, контрольная точка журнала изменений записывает только записи, которые изменились с момента последней контрольной точки на устойчивое хранилище".
Контрольная точка журнала изменений отключена по умолчанию. Вы можете включить контрольные точки журнала изменений на уровне SparkSession с помощью следующего синтаксиса:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
Вы можете включить контрольную точку журнала изменений в существующем потоке и сохранить сведения о состоянии, хранящиеся в контрольной точке.
Внимание
Запросы, которые включили контрольную точку журнала изменений, могут выполняться только в Databricks Runtime 13.3 LTS и более поздних версиях. Вы можете отключить контрольную точку журнала изменений, чтобы вернуться к устаревшей поведению контрольных точек, но вы должны продолжать выполнять эти запросы в Databricks Runtime 13.3 LTS или более поздней версии. Для выполнения этих изменений необходимо перезапустить задание.
Метрики хранилища состояний RocksDB
Каждый оператор состояния собирает метрики, связанные с операциями управления состоянием, выполненными на экземпляре RocksDB, для наблюдения за хранилищем состояний и, возможно, помощью в отладке медленной работы заданий. Эти метрики агрегируются (суммируются) для каждого оператора состояния в задании по всем задачам, в которых выполняется оператор состояния. Эти метрики являются частью сопоставлений customMetrics
внутри полей stateOperators
в StreamingQueryProgress
. Ниже приведен пример StreamingQueryProgress
в форме JSON (получен с помощью StreamingQueryProgress.json()
).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
Ниже приведены подробные описания метрик.
Имя метрики | Description |
---|---|
rocksdbCommitWriteBatchLatency | Время (в миллисекундах), которое понадобилось для применения промежуточных операций записи в структуре в памяти (WriteBatch) к собственному RocksDB. |
rocksdbCommitFlushLatency | Время (в миллисекундах), которое понадобилось для очистки изменений в памяти RocksDB на локальном диске. |
rocksdbCommitCompactLatency | Время (в миллисекундах), которое понадобилось для сжатия (необязательного) во время фиксации контрольной точки. |
rocksdbCommitPauseLatency | Время (в миллисекундах), которое понадобилось для остановки фоновых рабочих потоков (для сжатия и т. д.) в рамках фиксации контрольной точки. |
rocksdbCommitCheckpointLatency | Время (в миллисекундах), которое понадобилось для создания моментального снимка собственного RockDB и запись его в локальный каталог. |
rocksdbCommitFileSyncLatencyMs | Время (в миллисекундах), которое понадобилось для синхронизации файлов, связанных с моментальным снимком собственного RocksDB, со внешним хранилищем (расположением контрольной точки). |
rocksdbGetLatency | Среднее время (в наносекундах), затраченное на базовый вызов собственного RocksDB::Get . |
rocksdbPutCount | Среднее время (в наносекундах), затраченное на базовый вызов собственного RocksDB::Put . |
rocksdbGetCount | Число собственных вызовов RocksDB::Get (без учета Gets из WriteBatch — в пакете памяти, используемом для промежуточной записи). |
rocksdbPutCount | Число собственных вызовов RocksDB::Put (без учета Puts в WriteBatch — в пакете памяти, используемом для промежуточной записи). |
rocksdbTotalBytesReadByGet | Число несжатых байтов, считанных с помощью собственных вызовов RocksDB::Get . |
rocksdbTotalBytesWrittenByPut | Число несжатых байтов, записанных с помощью собственных вызовов RocksDB::Put . |
rocksdbReadBlockCacheHitCount | Количество использований собственного кэша блоков RocksDB, чтобы избежать считывания данных с локального диска. |
rocksdbReadBlockCacheMissCount | Количество промахов собственного кэша блоков RocksDB, которые требовались для считывания данных с локального диска. |
rocksdbTotalBytesReadByCompaction | Число байтов, считанных с локального диска с помощью собственного процесса сжатия RocksDB. |
rocksdbTotalBytesWrittenByCompaction | Число байтов, записанных на локальный диск с помощью собственного процесса сжатия RocksDB. |
rocksdbTotalCompactionLatencyMs | Время (в миллисекундах), потраченное на сжатие RocksDB (как фоновое, так и дополнительное сжатие, инициированное во время фиксации). |
rocksdbWriterStallLatencyMs | Время (в миллисекундах), которое модуль записи простаивал из-за фонового сжатия или сброса объектов memtable на диск. |
rocksdbTotalBytesReadThroughIterator | Некоторые из операций с отслеживанием состояния (например, обработка времени ожидания в flatMapGroupsWithState или добавление пределов в агрегатах с окнами) требуют считывания всех данных в базе данных с помощью итератора. Общий размер несжатых данных, считанных с помощью итератора. |