共用方式為


在 Azure Databricks 設定 RocksDB 狀態存放區

您可以在啟動串流查詢之前,先在SparkSession 中設定下列設定,以啟用RocksDB型狀態管理。

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

您可以在 DLT 管線上啟用 RocksDB。 請參閱 優化管線組態以進行具狀態處理

啟用變更記錄檢查點

在 Databricks Runtime 13.3 LTS 和更新版本中,您可以啟用變更記錄檢查點,以降低結構化串流工作負載的檢查點持續時間和端對端延遲。 Databricks 建議為所有具狀態的結構化串流查詢啟用變更記錄檢查點。

傳統上,RocksDB State 存儲區會在檢查點期間快照並上傳資料檔案。 為了避免這項成本,changelog 檢查點檢查僅將自上次檢查點後變更的記錄寫入永久儲存。

預設停用變更日誌檢查點。 您可以使用下列語法,在 SparkSession 層級中啟用變更記錄檢查點:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

您可以在現有的數據流上啟用變更記錄檢查點,並維護儲存在檢查點中的狀態資訊。

重要

已啟用變更記錄檢查點的查詢只能在 Databricks Runtime 13.3 LTS 和更新版本上執行。 您可以停用變更記錄檢查點以還原為舊版檢查點行為,但您必須繼續在 Databricks Runtime 13.3 LTS 或更新版本上執行這些查詢。 您必須重新啟動作業,才能進行這些變更。

RocksDB 狀態儲存器指標

每個狀態運算子都會收集與其 RocksDB 實例上執行之狀態管理作業相關的計量,以觀察狀態存放區,並可能協助偵錯作業速度緩慢。

特定儲存區實例的度量會透過分割區 ID 和儲存區名稱來標記,確保它們保持獨立。 在作業中,所有其他指標都會依各狀態運算符在所有執行任務中進行匯總(總和)。

這些計量是 StreamingQueryProgressstateOperators 欄位內 customMetrics 地圖的一部分。 以下是 JSON 格式的範例 StreamingQueryProgress (使用 StreamingQueryProgress.json()取得)。

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

計量的詳細描述如下:

度量名稱 描述
rocksdb提交寫入批次延遲 將階段性寫入(WriteBatch)應用於原生 RocksDB 的記憶體結構所花費的時間(毫秒)。
rocksdbCommitFlushLatency 將 RocksDB 記憶體中的變更寫入本機磁碟所花費的時間(以毫秒為單位)。
rocksdbCommitCompactLatency 在檢查點提交期間,用於壓縮的時間(毫秒)(選擇性)。
rocksdbCommitPauseLatency 停止背景工作線程(如壓縮等)的時間(以毫秒計),作為檢查點提交的一部分。
rocksdbCommitCheckpointLatency 將 RocksDB 原生快照拍攝後寫入本機目錄所耗費的時間(以毫秒計)。
rocksdbCommitFileSyncLatencyMs 將原生 RocksDB 快照集相關的檔案同步至外部存儲(檢查點位置)所需的時間(以毫秒計算)。
rocksdbGetLatency 每個基礎原生 RocksDB::Get 呼叫的平均時間(單位:奈秒)。
rocksdbPutCount 每次基礎原生 RocksDB::Put 呼叫所花的平均時間(以奈秒為單位)。
rocksdbGetCount 原生 RocksDB::Get 呼叫次數(不包括來自 WriteBatch 的 Gets - 用於預備寫入的記憶體批次)。
rocksdbPutCount 原生RocksDB::Put呼叫次數(不包括Puts至WriteBatch — 用於暫存寫入的記憶體批次)。
rocksdb通過Get讀取的總位元組數 透過原生 RocksDB::Get 呼叫讀取的未壓縮位元組數目。
rocksdb透過Put寫入的總字節數 透過原生 RocksDB::Put 呼叫寫入的未壓縮位元組數目。
rocksdbReadBlockCacheHitCount 原生 RocksDB 區塊快取被使用的次數,以避免從本機磁碟讀取數據。
rocksdbReadBlockCacheMissCount 原生 RocksDB 區塊快取遺漏且需要從本機磁碟讀取數據的次數。
rocksdb總壓縮讀取位元組數 (rocksdbTotalBytesReadByCompaction) 原生 RocksDB 壓縮程式從本機磁碟讀取的位元組數目。
rocksdb壓縮寫入的總位元組數 原生 RocksDB 壓縮程式寫入本機磁碟的位元組數目。
rocksdbTotalCompactionLatencyMs RocksDB 壓縮所花費的時間(毫秒),包括背景壓縮和在提交期間啟動的選擇性壓縮。
rocksdb寫入者延遲時間毫秒 時間(毫秒)寫入進程因背景壓縮或將內存表清除到磁碟而等待。
rocksdbTotalBytesReadThroughIterator 某些具狀態的操作(例如在flatMapGroupsWithState中進行的逾時處理或在視窗聚合中的浮水印)需要透過迭代器從資料庫中讀取整個數據。 使用迭代器讀取未壓縮資料的總大小。