Azure Databricks에서 RocksDB 상태 저장소 구성
스트리밍 쿼리를 시작하기 전에 SparkSession에서 다음 구성을 설정하여 RocksDB 기반 상태 관리를 사용하도록 설정할 수 있습니다.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
Delta Live Tables 파이프라인에서 RocksDB를 사용하도록 설정할 수 있습니다. 상태 저장 처리의 파이프라인 구성 최적화 ()를 참조하세요.
변경 로그 검사점 사용
Databricks Runtime 13.3 LTS 이상에서 구조적 스트리밍 워크로드에 대한 검사점 기간 및 엔드 투 엔드 대기 시간을 낮추도록 변경 로그 검사점을 사용하도록 설정할 수 있습니다. Databricks는 모든 구조적 스트리밍 상태 저장 쿼리에 대해 changelog 검사포인트를 사용하도록 설정할 것을 권합니다.
일반적으로 RocksDB 상태 저장소는 검사점이 있는 동안 데이터 파일을 스냅샷하고 업로드합니다. 이 비용을 방지하기 위해 변경 로그 검사점은 마지막 검사점 이후 변경된 레코드만 지속성 스토리지에 기록합니다."
변경 로그 검사점은 기본적으로 사용하지 않도록 설정됩니다. 다음 구문을 사용하여 SparkSession 수준에서 변경 로그 검사점을 사용하도록 설정할 수 있습니다.
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
기존 스트림에서 변경 로그 검사점을 사용하도록 설정하고 검사점에 저장된 상태 정보를 유지할 수 있습니다.
Important
변경 로그 검사점을 사용하도록 설정한 쿼리는 Databricks Runtime 13.3 LTS 이상에서만 실행할 수 있습니다. 변경 로그 검사점을 사용하지 않도록 설정하여 레거시 검사점 동작으로 되돌릴 수 있지만 Databricks Runtime 13.3 LTS 이상에서 이러한 쿼리를 계속 실행해야 합니다. 이러한 변경이 수행되려면 작업을 다시 시작해야 합니다.
RocksDB 상태 저장소 메트릭
각 상태 연산자는 해당 RocksDB 인스턴스에서 수행된 상태 관리 작업과 관련된 메트릭을 수집하여 상태 저장소를 관찰하고 작업 속도 저하 문제의 디버그를 지원할 수 있습니다. 이러한 메트릭은 작업 중 상태 연산자가 실행되는 모든 작업에서 각 상태 연산자당 집계(합계)됩니다. 이러한 메트릭은 customMetrics
의 stateOperators
필드 내 StreamingQueryProgress
맵의 일부입니다. 다음은 JSON 형식의 StreamingQueryProgress
예제입니다(StreamingQueryProgress.json()
을 사용하여 가져옴).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
메트릭에 대한 자세한 설명은 다음과 같습니다.
메트릭 이름 | 설명 |
---|---|
rocksdbCommitWriteBatchLatency | 메모리 내 구조체(WriteBatch)에 스테이징된 쓰기를 네이티브 RocksDB에 적용하는 데 걸린 시간(밀리초)입니다. |
rocksdbCommitFlushLatency | RocksDB 메모리 내 변경 내용을 로컬 디스크로 플러시하는 데 걸린 시간(밀리초)입니다. |
rocksdbCommitCompactLatency | 검사점 커밋 중 압축(선택 사항)에 걸린 시간(밀리초)입니다. |
rocksdbCommitPauseLatency | 검사점 커밋의 일부로 압축 등을 위해 백그라운드 작업자 스레드를 중지하는 데 걸린 시간(밀리초)입니다. |
rocksdbCommitCheckpointLatency | 네이티브 RocksDB 스냅샷을 만들고 로컬 디렉터리에 쓰는 데 걸린 시간(밀리초)입니다. |
rocksdbCommitFileSyncLatencyMs | 네이티브 RocksDB 스냅샷 관련 파일을 외부 스토리지(검사점 위치)에 동기화하는 데 걸린 시간(밀리초)입니다. |
rocksdbGetLatency | 기본 네이티브 RocksDB::Get 호출당 평균 소요 시간(나노초)입니다. |
rocksdbPutCount | 기본 네이티브 RocksDB::Put 호출당 평균 소요 시간(나노초)입니다. |
rocksdbGetCount | 네이티브 RocksDB::Get 호출 수입니다(쓰기 스테이징에 사용되는 메모리 내 일괄 처리인 WriteBatch에서의 Gets 은 포함되지 않음). |
rocksdbPutCount | 네이티브 RocksDB::Put 호출 수입니다(쓰기 스테이징에 사용되는 메모리 내 일괄 처리인 WriteBatch에 대한 Puts 은 포함되지 않음). |
rocksdbTotalBytesReadByGet | 네이티브 RocksDB::Get 호출을 통해 읽은, 압축되지 않은 바이트 수입니다. |
rocksdbTotalBytesWrittenByPut | 네이티브 RocksDB::Put 호출을 통해 쓴, 압축되지 않은 바이트 수입니다. |
rocksdbReadBlockCacheHitCount | 로컬 디스크에서 데이터를 읽지 않도록 네이티브 RocksDB 블록 캐시가 사용된 횟수입니다. |
rocksdbReadBlockCacheMissCount | 네이티브 RocksDB 블록 캐시가 누락되어 로컬 디스크에서 데이터를 읽어야 했던 횟수입니다. |
rocksdbTotalBytesReadByCompaction | 네이티브 RocksDB 압축 프로세스가 로컬 디스크에서 읽은 바이트 수입니다. |
rocksdbTotalBytesWrittenByCompaction | 네이티브 RocksDB 압축 프로세스가 로컬 디스크에 쓴 바이트 수입니다. |
rocksdbTotalCompactionLatencyMs | RocksDB 압축(커밋 중에 시작된 선택적 압축과 백그라운드 압축 포함)에 걸린 시간(밀리초)입니다. |
rocksdbWriterStallLatencyMs | 백그라운드 압축 또는 memtable을 디스크로 플러시하는 작업으로 인해 기록기가 중단된 시간(밀리초)입니다. |
rocksdbTotalBytesReadThroughIterator | 일부 상태 저장 작업(예: flatMapGroupsWithState 의 시간 제한 처리 또는 기간 이동 집계의 워터마크 처리)은 반복기를 통해 DB의 전체 데이터를 읽어야 합니다. 반복기를 사용하여 읽은, 압축되지 않은 데이터의 총 크기입니다. |