Поделиться через


Настройка хранилища состояний RocksDB в Azure Databricks

Вы можете включить управление состоянием на основе RocksDB, задав следующую конфигурацию в SparkSession перед началом потокового запроса.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Вы можете включить RocksDB в конвейерах Delta Live Tables. См. раздел "Оптимизация конфигурации конвейера" для обработки с отслеживанием состояния.

Включение контрольных точек журнала изменений

В Databricks Runtime 13.3 LTS и более поздних версиях можно включить контрольную точку журнала изменений, чтобы снизить длительность контрольной точки и сквозную задержку для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует включать контрольные точки журнала изменений для всех запросов структурированных потоков с отслеживанием состояния.

Традиционно моментальные снимки хранилища состояний RocksDB и передают файлы данных во время контрольной точки. Чтобы избежать этой стоимости, контрольная точка журнала изменений записывает только записи, которые изменились с момента последней контрольной точки на устойчивое хранилище".

Контрольная точка журнала изменений отключена по умолчанию. Вы можете включить контрольные точки журнала изменений на уровне SparkSession с помощью следующего синтаксиса:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Вы можете включить контрольную точку журнала изменений в существующем потоке и сохранить сведения о состоянии, хранящиеся в контрольной точке.

Внимание

Запросы, которые включили контрольную точку журнала изменений, могут выполняться только в Databricks Runtime 13.3 LTS и более поздних версиях. Вы можете отключить контрольную точку журнала изменений, чтобы вернуться к устаревшей поведению контрольных точек, но вы должны продолжать выполнять эти запросы в Databricks Runtime 13.3 LTS или более поздней версии. Для выполнения этих изменений необходимо перезапустить задание.

Метрики хранилища состояний RocksDB

Каждый оператор состояния собирает метрики, связанные с операциями управления состоянием, выполненными на экземпляре RocksDB, для наблюдения за хранилищем состояний и, возможно, помощью в отладке медленной работы заданий. Эти метрики агрегируются (суммируются) для каждого оператора состояния в задании по всем задачам, в которых выполняется оператор состояния. Эти метрики являются частью сопоставлений customMetrics внутри полей stateOperators в StreamingQueryProgress. Ниже приведен пример StreamingQueryProgress в форме JSON (получен с помощью StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Ниже приведены подробные описания метрик.

Имя метрики Description
rocksdbCommitWriteBatchLatency Время (в миллисекундах), которое понадобилось для применения промежуточных операций записи в структуре в памяти (WriteBatch) к собственному RocksDB.
rocksdbCommitFlushLatency Время (в миллисекундах), которое понадобилось для очистки изменений в памяти RocksDB на локальном диске.
rocksdbCommitCompactLatency Время (в миллисекундах), которое понадобилось для сжатия (необязательного) во время фиксации контрольной точки.
rocksdbCommitPauseLatency Время (в миллисекундах), которое понадобилось для остановки фоновых рабочих потоков (для сжатия и т. д.) в рамках фиксации контрольной точки.
rocksdbCommitCheckpointLatency Время (в миллисекундах), которое понадобилось для создания моментального снимка собственного RockDB и запись его в локальный каталог.
rocksdbCommitFileSyncLatencyMs Время (в миллисекундах), которое понадобилось для синхронизации файлов, связанных с моментальным снимком собственного RocksDB, со внешним хранилищем (расположением контрольной точки).
rocksdbGetLatency Среднее время (в наносекундах), затраченное на базовый вызов собственного RocksDB::Get.
rocksdbPutCount Среднее время (в наносекундах), затраченное на базовый вызов собственного RocksDB::Put.
rocksdbGetCount Число собственных вызовов RocksDB::Get (без учета Gets из WriteBatch — в пакете памяти, используемом для промежуточной записи).
rocksdbPutCount Число собственных вызовов RocksDB::Put (без учета Puts в WriteBatch — в пакете памяти, используемом для промежуточной записи).
rocksdbTotalBytesReadByGet Число несжатых байтов, считанных с помощью собственных вызовов RocksDB::Get.
rocksdbTotalBytesWrittenByPut Число несжатых байтов, записанных с помощью собственных вызовов RocksDB::Put.
rocksdbReadBlockCacheHitCount Количество использований собственного кэша блоков RocksDB, чтобы избежать считывания данных с локального диска.
rocksdbReadBlockCacheMissCount Количество промахов собственного кэша блоков RocksDB, которые требовались для считывания данных с локального диска.
rocksdbTotalBytesReadByCompaction Число байтов, считанных с локального диска с помощью собственного процесса сжатия RocksDB.
rocksdbTotalBytesWrittenByCompaction Число байтов, записанных на локальный диск с помощью собственного процесса сжатия RocksDB.
rocksdbTotalCompactionLatencyMs Время (в миллисекундах), потраченное на сжатие RocksDB (как фоновое, так и дополнительное сжатие, инициированное во время фиксации).
rocksdbWriterStallLatencyMs Время (в миллисекундах), которое модуль записи простаивал из-за фонового сжатия или сброса объектов memtable на диск.
rocksdbTotalBytesReadThroughIterator Некоторые из операций с отслеживанием состояния (например, обработка времени ожидания в flatMapGroupsWithState или добавление пределов в агрегатах с окнами) требуют считывания всех данных в базе данных с помощью итератора. Общий размер несжатых данных, считанных с помощью итератора.