다음을 통해 공유


Azure Databricks에서 RocksDB 상태 저장소 구성

스트리밍 쿼리를 시작하기 전에 SparkSession에서 다음 구성을 설정하여 RocksDB 기반 상태 관리를 사용하도록 설정할 수 있습니다.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Delta Live Tables 파이프라인에서 RocksDB를 사용하도록 설정할 수 있습니다. 상태 저장 처리의 파이프라인 구성 최적화 ()를 참조하세요.

변경 로그 검사점 사용

Databricks Runtime 13.3 LTS 이상에서 구조적 스트리밍 워크로드에 대한 검사점 기간 및 엔드 투 엔드 대기 시간을 낮추도록 변경 로그 검사점을 사용하도록 설정할 수 있습니다. Databricks는 모든 구조적 스트리밍 상태 저장 쿼리에 대해 changelog 검사포인트를 사용하도록 설정할 것을 권합니다.

일반적으로 RocksDB 상태 저장소는 검사점이 있는 동안 데이터 파일을 스냅샷하고 업로드합니다. 이 비용을 방지하기 위해 변경 로그 검사점은 마지막 검사점 이후 변경된 레코드만 지속성 스토리지에 기록합니다."

변경 로그 검사점은 기본적으로 사용하지 않도록 설정됩니다. 다음 구문을 사용하여 SparkSession 수준에서 변경 로그 검사점을 사용하도록 설정할 수 있습니다.

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

기존 스트림에서 변경 로그 검사점을 사용하도록 설정하고 검사점에 저장된 상태 정보를 유지할 수 있습니다.

Important

변경 로그 검사점을 사용하도록 설정한 쿼리는 Databricks Runtime 13.3 LTS 이상에서만 실행할 수 있습니다. 변경 로그 검사점을 사용하지 않도록 설정하여 레거시 검사점 동작으로 되돌릴 수 있지만 Databricks Runtime 13.3 LTS 이상에서 이러한 쿼리를 계속 실행해야 합니다. 이러한 변경이 수행되려면 작업을 다시 시작해야 합니다.

RocksDB 상태 저장소 메트릭

각 상태 연산자는 해당 RocksDB 인스턴스에서 수행된 상태 관리 작업과 관련된 메트릭을 수집하여 상태 저장소를 관찰하고 작업 속도 저하 문제의 디버그를 지원할 수 있습니다. 이러한 메트릭은 작업 중 상태 연산자가 실행되는 모든 작업에서 각 상태 연산자당 집계(합계)됩니다. 이러한 메트릭은 customMetricsstateOperators 필드 내 StreamingQueryProgress 맵의 일부입니다. 다음은 JSON 형식의 StreamingQueryProgress 예제입니다(StreamingQueryProgress.json()을 사용하여 가져옴).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

메트릭에 대한 자세한 설명은 다음과 같습니다.

메트릭 이름 설명
rocksdbCommitWriteBatchLatency 메모리 내 구조체(WriteBatch)에 스테이징된 쓰기를 네이티브 RocksDB에 적용하는 데 걸린 시간(밀리초)입니다.
rocksdbCommitFlushLatency RocksDB 메모리 내 변경 내용을 로컬 디스크로 플러시하는 데 걸린 시간(밀리초)입니다.
rocksdbCommitCompactLatency 검사점 커밋 중 압축(선택 사항)에 걸린 시간(밀리초)입니다.
rocksdbCommitPauseLatency 검사점 커밋의 일부로 압축 등을 위해 백그라운드 작업자 스레드를 중지하는 데 걸린 시간(밀리초)입니다.
rocksdbCommitCheckpointLatency 네이티브 RocksDB 스냅샷을 만들고 로컬 디렉터리에 쓰는 데 걸린 시간(밀리초)입니다.
rocksdbCommitFileSyncLatencyMs 네이티브 RocksDB 스냅샷 관련 파일을 외부 스토리지(검사점 위치)에 동기화하는 데 걸린 시간(밀리초)입니다.
rocksdbGetLatency 기본 네이티브 RocksDB::Get 호출당 평균 소요 시간(나노초)입니다.
rocksdbPutCount 기본 네이티브 RocksDB::Put 호출당 평균 소요 시간(나노초)입니다.
rocksdbGetCount 네이티브 RocksDB::Get 호출 수입니다(쓰기 스테이징에 사용되는 메모리 내 일괄 처리인 WriteBatch에서의 Gets은 포함되지 않음).
rocksdbPutCount 네이티브 RocksDB::Put 호출 수입니다(쓰기 스테이징에 사용되는 메모리 내 일괄 처리인 WriteBatch에 대한 Puts은 포함되지 않음).
rocksdbTotalBytesReadByGet 네이티브 RocksDB::Get 호출을 통해 읽은, 압축되지 않은 바이트 수입니다.
rocksdbTotalBytesWrittenByPut 네이티브 RocksDB::Put 호출을 통해 쓴, 압축되지 않은 바이트 수입니다.
rocksdbReadBlockCacheHitCount 로컬 디스크에서 데이터를 읽지 않도록 네이티브 RocksDB 블록 캐시가 사용된 횟수입니다.
rocksdbReadBlockCacheMissCount 네이티브 RocksDB 블록 캐시가 누락되어 로컬 디스크에서 데이터를 읽어야 했던 횟수입니다.
rocksdbTotalBytesReadByCompaction 네이티브 RocksDB 압축 프로세스가 로컬 디스크에서 읽은 바이트 수입니다.
rocksdbTotalBytesWrittenByCompaction 네이티브 RocksDB 압축 프로세스가 로컬 디스크에 쓴 바이트 수입니다.
rocksdbTotalCompactionLatencyMs RocksDB 압축(커밋 중에 시작된 선택적 압축과 백그라운드 압축 포함)에 걸린 시간(밀리초)입니다.
rocksdbWriterStallLatencyMs 백그라운드 압축 또는 memtable을 디스크로 플러시하는 작업으로 인해 기록기가 중단된 시간(밀리초)입니다.
rocksdbTotalBytesReadThroughIterator 일부 상태 저장 작업(예: flatMapGroupsWithState의 시간 제한 처리 또는 기간 이동 집계의 워터마크 처리)은 반복기를 통해 DB의 전체 데이터를 읽어야 합니다. 반복기를 사용하여 읽은, 압축되지 않은 데이터의 총 크기입니다.