在 Azure Databricks 設定 RocksDB 狀態存放區
您可以在啟動串流查詢之前,先在SparkSession 中設定下列設定,以啟用RocksDB型狀態管理。
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
您可以在 DLT 管線上啟用 RocksDB。 請參閱 優化管線組態以進行具狀態處理。
啟用變更記錄檢查點
在 Databricks Runtime 13.3 LTS 和更新版本中,您可以啟用變更記錄檢查點,以降低結構化串流工作負載的檢查點持續時間和端對端延遲。 Databricks 建議為所有具狀態的結構化串流查詢啟用變更記錄檢查點。
傳統上,RocksDB State 存儲區會在檢查點期間快照並上傳資料檔案。 為了避免這項成本,changelog 檢查點檢查僅將自上次檢查點後變更的記錄寫入永久儲存。
預設停用變更日誌檢查點。 您可以使用下列語法,在 SparkSession 層級中啟用變更記錄檢查點:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
您可以在現有的數據流上啟用變更記錄檢查點,並維護儲存在檢查點中的狀態資訊。
重要
已啟用變更記錄檢查點的查詢只能在 Databricks Runtime 13.3 LTS 和更新版本上執行。 您可以停用變更記錄檢查點以還原為舊版檢查點行為,但您必須繼續在 Databricks Runtime 13.3 LTS 或更新版本上執行這些查詢。 您必須重新啟動作業,才能進行這些變更。
RocksDB 狀態儲存器指標
每個狀態運算子都會收集與其 RocksDB 實例上執行之狀態管理作業相關的計量,以觀察狀態存放區,並可能協助偵錯作業速度緩慢。
特定儲存區實例的度量會透過分割區 ID 和儲存區名稱來標記,確保它們保持獨立。 在作業中,所有其他指標都會依各狀態運算符在所有執行任務中進行匯總(總和)。
這些計量是 StreamingQueryProgress
中 stateOperators
欄位內 customMetrics
地圖的一部分。 以下是 JSON 格式的範例 StreamingQueryProgress
(使用 StreamingQueryProgress.json()
取得)。
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
計量的詳細描述如下:
度量名稱 | 描述 |
---|---|
rocksdb提交寫入批次延遲 | 將階段性寫入(WriteBatch)應用於原生 RocksDB 的記憶體結構所花費的時間(毫秒)。 |
rocksdbCommitFlushLatency | 將 RocksDB 記憶體中的變更寫入本機磁碟所花費的時間(以毫秒為單位)。 |
rocksdbCommitCompactLatency | 在檢查點提交期間,用於壓縮的時間(毫秒)(選擇性)。 |
rocksdbCommitPauseLatency | 停止背景工作線程(如壓縮等)的時間(以毫秒計),作為檢查點提交的一部分。 |
rocksdbCommitCheckpointLatency | 將 RocksDB 原生快照拍攝後寫入本機目錄所耗費的時間(以毫秒計)。 |
rocksdbCommitFileSyncLatencyMs | 將原生 RocksDB 快照集相關的檔案同步至外部存儲(檢查點位置)所需的時間(以毫秒計算)。 |
rocksdbGetLatency | 每個基礎原生 RocksDB::Get 呼叫的平均時間(單位:奈秒)。 |
rocksdbPutCount | 每次基礎原生 RocksDB::Put 呼叫所花的平均時間(以奈秒為單位)。 |
rocksdbGetCount | 原生 RocksDB::Get 呼叫次數(不包括來自 WriteBatch 的 Gets - 用於預備寫入的記憶體批次)。 |
rocksdbPutCount | 原生RocksDB::Put 呼叫次數(不包括Puts 至WriteBatch — 用於暫存寫入的記憶體批次)。 |
rocksdb通過Get讀取的總位元組數 | 透過原生 RocksDB::Get 呼叫讀取的未壓縮位元組數目。 |
rocksdb透過Put寫入的總字節數 | 透過原生 RocksDB::Put 呼叫寫入的未壓縮位元組數目。 |
rocksdbReadBlockCacheHitCount | 原生 RocksDB 區塊快取被使用的次數,以避免從本機磁碟讀取數據。 |
rocksdbReadBlockCacheMissCount | 原生 RocksDB 區塊快取遺漏且需要從本機磁碟讀取數據的次數。 |
rocksdb總壓縮讀取位元組數 (rocksdbTotalBytesReadByCompaction) | 原生 RocksDB 壓縮程式從本機磁碟讀取的位元組數目。 |
rocksdb壓縮寫入的總位元組數 | 原生 RocksDB 壓縮程式寫入本機磁碟的位元組數目。 |
rocksdbTotalCompactionLatencyMs | RocksDB 壓縮所花費的時間(毫秒),包括背景壓縮和在提交期間啟動的選擇性壓縮。 |
rocksdb寫入者延遲時間毫秒 | 時間(毫秒)寫入進程因背景壓縮或將內存表清除到磁碟而等待。 |
rocksdbTotalBytesReadThroughIterator | 某些具狀態的操作(例如在flatMapGroupsWithState 中進行的逾時處理或在視窗聚合中的浮水印)需要透過迭代器從資料庫中讀取整個數據。 使用迭代器讀取未壓縮資料的總大小。 |