Поделиться через


Настройка хранилища состояний RocksDB в Azure Databricks

Вы можете включить управление состоянием на основе RocksDB, задав следующую конфигурацию в SparkSession перед началом потокового запроса.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Вы можете включить RocksDB в конвейерах DLT. См. раздел Оптимизация конфигурации конвейера для обработки с сохранением состояния.

Включение контрольных точек журнала изменений

В Databricks Runtime 13.3 LTS и более поздних версиях вы можете включить контрольную точку отслеживания изменений, чтобы сократить время контрольной точки и уменьшить сквозную задержку для рабочих нагрузок структурированной потоковой передачи. Databricks рекомендует включать контрольные точки журнала изменений для всех запросов структурированных потоков с отслеживанием состояния.

Традиционно в хранилище состояний RocksDB создаются моментальные снимки и загружаются файлы данных во время создания контрольной точки. Чтобы избежать этих затрат, контрольные точки журнала изменений записывают на надежное хранилище только те записи, которые изменились с момента последней контрольной точки.

Контроль журналирования изменений отключен по умолчанию. Вы можете включить контрольные точки журнала изменений на уровне SparkSession с помощью следующего синтаксиса:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Вы можете включить контрольную точку журнала изменений в существующем потоке и сохранить сведения о состоянии, хранящиеся в контрольной точке.

Внимание

Запросы, в которых включена фиксация контрольных точек журнала изменений, могут выполняться только в Databricks Runtime 13.3 LTS или более поздних версиях. Вы можете отключить контрольную точку журнала изменений, чтобы вернуться к устаревшему поведению контрольных точек, но вы должны продолжать выполнять соответствующие запросы на Databricks Runtime 13.3 LTS или более поздней версии. Для выполнения этих изменений необходимо перезапустить задание.

Метрики хранилища состояний RocksDB

Каждый оператор состояния собирает метрики, связанные с операциями управления состоянием, выполненными на экземпляре RocksDB, чтобы следить за хранилищем состояний и, возможно, помочь в отладке медленной работы задач.

Метрики для конкретного экземпляра хранилища состояния помечены идентификаторами их разделов и именами хранилищ, что гарантирует их раздельное существование. Все остальные метрики агрегируются (суммой) по оператору состояния в задании для всех задач, где работает оператор состояния.

Эти метрики являются частью карты customMetrics, находящейся внутри полей stateOperators в StreamingQueryProgress. Ниже приведен пример StreamingQueryProgress в форме JSON (получен с помощью StreamingQueryProgress.json()).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Ниже приведены подробные описания метрик.

Имя метрики Описание
Задержка фиксации пакета записей rocksdb (rocksdbCommitWriteBatchLatency) Время (в миллисекундах), затраченное на применение стадированных операций записи в структуре в памяти (WriteBatch) к родной базе данных RocksDB.
rocksdbCommitFlushLatency Время (в миллисекундах), которое понадобилось для очистки изменений в памяти RocksDB на локальном диске.
rocksdbCommitCompactLatency Время (в миллисекундах), которое понадобилось для сжатия (необязательного) во время фиксации контрольной точки.
rocksdbCommitPauseLatency Время (в миллисекундах), затраченное на остановку фоновых рабочих потоков (для компактизации и т. д.) в процессе фиксации контрольной точки.
rocksdbCommitCheckpointLatency Время (в миллисекундах), которое понадобилось для создания моментального снимка встроенного RocksDB и его записи в локальный каталог.
rocksdbCommitFileSyncLatencyMs Время (в миллисекундах), которое понадобилось на синхронизацию файлов, связанных с собственным моментальным снимком RocksDB, со внешним хранилищем в месте контрольной точки.
rocksdbGetLatency Среднее время (в наносекундах), затраченное на один базовый нативный вызов RocksDB::Get.
rocksdbPutCount Среднее время (в наносекундах), затраченное на один основной нативный вызов RocksDB::Put.
rocksdbGetCount Число собственных вызовов RocksDB::Get (без учета Gets из WriteBatch — пакет в памяти, используемый для промежуточного сохранения записей).
rocksdbPutCount Число собственных вызовов RocksDB::Put (без учета Puts в WriteBatch — в пакете памяти, используемом для промежуточной записи).
rocksdbTotalBytesReadByGet Число несжатых байтов, прочитанных с помощью собственных вызовов RocksDB::Get.
rocksdbОбщееЧислоБайтЗаписанныхСПомощьюPut Число несжатых байтов, записанных с помощью нативных вызовов RocksDB::Put.
rocksdbReadBlockCacheHitCount Количество использований собственного кэша блоков RocksDB, чтобы избежать считывания данных с локального диска.
КоличествоПропусковКэшаЧтенияБлоковRocksDB Количество промахов нативного кэша блоков RocksDB, требующего считывания данных с локального диска.
Общее количество байт, прочитанных при сжатии rocksdb Число байтов, считанных с локального диска с помощью собственного процесса сжатия RocksDB.
rocksdbОбщееКоличествоБайтЗаписанныхКомпакцией Число байтов, записанных на локальный диск с помощью собственного процесса сжатия RocksDB.
rocksdbTotalCompactionLatencyMs Время (в миллисекундах), потраченное на сжатие RocksDB (как фоновое, так и дополнительное сжатие, инициированное во время фиксации).
rocksdbWriterStallLatencyMs (задержка остановки записи в RocksDB в миллисекундах) Время (в миллисекундах), которое модуль записи простаивал из-за фоновой компакции или сброса мемтаблиц на диск.
"суммарное количество байт, прочитанных через итератор в базе данных RocksDB" Некоторые из операций с отслеживанием состояния (например, обработка времени ожидания в flatMapGroupsWithState или установка водяных знаков в окнах агрегирования) требует чтения всех данных в базе данных с помощью итератора. Общий размер несжатых данных, считанных с помощью итератора.