在 Azure Databricks 上配置 RocksDB 状态存储

可以在启动流式处理查询之前,通过在 SparkSession 中设置以下配置来启用基于 RocksDB 的状态管理。

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

可以在增量实时表管道上启用 RocksDB。 请参阅针对有状态处理优化管道配置

启用更改日志检查点

在 Databricks Runtime 13.3 LTS 及更高版本中,可以启用更改日志检查点功能,以降低结构化流工作负载的检查点持续时间和端到端延迟。 Databricks 建议为所有“结构化流式处理”有状态查询启用更改日志检查点。

以前,RocksDB 状态存储会在检查点操作期间对数据文件拍摄快照和进行上传。 为了避免这种成本,更改日志检查点功能会仅将自上一个检查点以来发生了更改的记录写入持久存储。”

默认情况下,更改日志检查点功能处于禁用状态。 可以使用以下语法在 SparkSession 级别启用更改日志检查点:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

可以在现有流上启用更改日志检查点,并维护存储在检查点中的状态信息。

重要

已启用更改日志检查点功能的查询只能在 Databricks Runtime 13.3 LTS 及更高版本上运行。 你可以禁用更改日志检查点功能以还原旧有的检查点行为,但必须继续在 Databricks Runtime 13.3 LTS 或更高版本上运行这些查询。 必须重启作业才能完成这些更改。

RocksDB 状态存储指标

每个状态运算符收集有关在其 RocksDB 实例上执行的状态管理操作的指标,这些指标可用于观测状态存储,并可能有助于调试作业速度缓慢问题。 这些指标将会根据运行状态运算符的所有任务中的每个作业中状态运算符进行聚合(加总)。 这些指标是 StreamingQueryProgressstateOperators 字段内的 customMetrics 映射的一部分。 下面是采用 JSON 格式的 StreamingQueryProgress 的示例(使用 StreamingQueryProgress.json() 获取)。

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

指标的详细说明如下:

指标名称 说明
rocksdbCommitWriteBatchLatency 将内存中结构中的分阶段写入 (WriteBatch) 应用于本机 RocksDB 所花费的时间(以毫秒为单位)。
rocksdbCommitFlushLatency 将 RocksDB 内存中更改刷新到本地磁盘所花费的时间(以毫秒为单位)。
rocksdbCommitCompactLatency 在检查点提交期间进行压缩(可选)所花费的时间(以毫秒为单位)。
rocksdbCommitPauseLatency 在检查点提交过程中停止后台工作线程(以实现压缩等目的)所花费的时间(以毫秒为单位)。
rocksdbCommitCheckpointLatency 创建本机 RocksDB 快照并将其写入本地目录所花费的时间(以毫秒为单位)。
rocksdbCommitFileSyncLatencyMs 将本机 RocksDB 快照相关的文件同步到外部存储(检查点位置)所花费的时间(以毫秒为单位)。
rocksdbGetLatency 每个基础本机 RocksDB::Get 调用平均花费的时间(以纳秒为单位)。
rocksdbPutCount 每个基础本机 RocksDB::Put 调用平均花费的时间(以纳秒为单位)。
rocksdbGetCount 本机 RocksDB::Get 调用的数量(不包括从 WriteBatch 执行的 Gets - 用于暂存写入的内存中批处理)。
rocksdbPutCount 本机 RocksDB::Put 调用数量(不包括对 WriteBatch 执行的 Puts - 用于暂存写入的内存中批处理)。
rocksdbTotalBytesReadByGet 通过本机 RocksDB::Get 调用读取的未压缩字节数。
rocksdbTotalBytesWrittenByPut 通过本机 RocksDB::Put 调用写入的未压缩字节数。
rocksdbReadBlockCacheHitCount 使用本机 RocksDB 块缓存避免从本地磁盘读取数据的次数。
rocksdbReadBlockCacheMissCount 从本地磁盘读取数据所需的未命中本机 RocksDB 块缓存次数。
rocksdbTotalBytesReadByCompaction 本机 RocksDB 压缩进程从本地磁盘读取的字节数。
rocksdbTotalBytesWrittenByCompaction 本机 RocksDB 压缩进程写入本地磁盘的字节数。
rocksdbTotalCompactionLatencyMs RocksDB 压缩(后台压缩以及在提交期间启动的可选压缩)所花费的时间(以毫秒为单位)。
rocksdbWriterStallLatencyMs 由于后台压缩或将内存表刷新到磁盘而导致写入器停滞的时间(以毫秒为单位)。
rocksdbTotalBytesReadThroughIterator 某些有状态操作(例如 flatMapGroupsWithState 中的超时处理或窗口聚合中的水印处理)需要通过迭代器读取 DB 中的整个数据。 使用迭代器读取的未压缩数据的总大小。