Partilhar via


Configurar o armazenamento de estado do RocksDB no Azure Databricks

Você pode habilitar o gerenciamento de estado baseado em RocksDB definindo a seguinte configuração no SparkSession antes de iniciar a consulta de streaming.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Você pode habilitar o RocksDB em pipelines Delta Live Tables. Consulte Otimizar a configuração do pipeline para processamento com monitoração de estado.

Ativar o ponto de verificação do registo de alterações

No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do changelog para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho do Structured Streaming. O Databricks recomenda ativar o ponto de verificação do registo de alterações para todas as consultas com monitorização de estado de Transmissão em Fluxo Estruturada.

Tradicionalmente, o RocksDB State Store faz snapshots e carrega arquivos de dados durante o checkpoint. Para evitar esse custo, o ponto de verificação do changelog grava apenas os registros que foram alterados desde o último ponto de verificação para armazenamento durável."

O ponto de verificação do changelog está desativado por padrão. Você pode habilitar o ponto de verificação do changelog no nível SparkSession usando a seguinte sintaxe:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Você pode habilitar o ponto de verificação do changelog em um fluxo existente e manter as informações de estado armazenadas no ponto de verificação.

Importante

As consultas que habilitaram o ponto de verificação do changelog só podem ser executadas no Databricks Runtime 13.3 LTS e superior. Você pode desabilitar o ponto de verificação do changelog para reverter para o comportamento de ponto de verificação herdado, mas deve continuar a executar essas consultas no Databricks Runtime 13.3 LTS ou superior. Você deve reiniciar o trabalho para que essas alterações ocorram.

Métricas do armazenamento de estado do RocksDB

Cada operador de estado coleta métricas relacionadas às operações de gerenciamento de estado executadas em sua instância RocksDB para observar o armazenamento de estado e potencialmente ajudar na depuração da lentidão do trabalho. Essas métricas são agregadas (soma) por operador de estado no trabalho em todas as tarefas em que o operador de estado está sendo executado. Essas métricas fazem parte do customMetrics mapa dentro dos stateOperators campos em StreamingQueryProgress. A seguir está um exemplo de StreamingQueryProgress no formulário JSON (obtido usando StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

As descrições detalhadas das métricas são as seguintes:

Nome da métrica Description
rocksdbCommitWriteBatchLatency Tempo (em milis) levou para aplicar as gravações em estágios na estrutura na memória (WriteBatch) ao RocksDB nativo.
rocksdbCommitFlushLatency Tempo (em milis) necessário para liberar as alterações na memória do RocksDB para o disco local.
rocksdbCommitCompactLatency Tempo (em milis) levado para compactação (opcional) durante a confirmação do ponto de verificação.
rocksdbCommitPauseLatency Tempo (em milis) necessário para parar os threads de trabalho em segundo plano (para compactação, etc.) como parte da confirmação do ponto de verificação.
rocksdbCommitCheckpointLatency Tempo (em milis) levou para tirar um instantâneo do RocksDB nativo e gravá-lo em um diretório local.
rocksdbCommitFileSyncLatencyMs Tempo (em milis) necessário para sincronizar os arquivos nativos relacionados ao snapshot do RocksDB com um armazenamento externo (local do ponto de verificação).
rocksdbGetLatency Tempo médio (em nanos) levado por chamada nativa RocksDB::Get subjacente.
rocksdbPutCount Tempo médio (em nanos) levado por chamada nativa RocksDB::Put subjacente.
rocksdbGetCount Número de chamadas nativas RocksDB::Get (não inclui Gets de WriteBatch - no lote de memória usado para gravações de preparação).
rocksdbPutCount Número de chamadas nativas RocksDB::Put (não inclui Puts WriteBatch - no lote de memória usado para gravações de preparação).
rocksdbTotalBytesReadByGet Número de bytes não compactados lidos através de chamadas nativas RocksDB::Get .
rocksdbTotalBytesWrittenByPut Número de bytes não compactados gravados por meio de chamadas nativas RocksDB::Put .
rocksdbReadBlockCacheHitCount Número de vezes que o cache de bloco nativo do RocksDB é usado para evitar a leitura de dados do disco local.
rocksdbReadBlockCacheMissCount Número de vezes que o cache de bloco nativo do RocksDB perdeu e exigiu a leitura de dados do disco local.
rocksdbTotalBytesReadByCompaction Número de bytes lidos do disco local pelo processo de compactação nativo do RocksDB.
rocksdbTotalBytesWrittenByCompaction Número de bytes gravados no disco local pelo processo de compactação nativo do RocksDB.
rocksdbTotalCompactionLatencyMs Tempo (em milis) levado para compactações RocksDB (tanto o fundo quanto a compactação opcional iniciada durante a confirmação).
rocksdbWriterStallLatencyMs O tempo (em milis) do gravador estagnou devido a uma compactação de fundo ou lavagem dos memtables para o disco.
rocksdbTotalBytesReadThroughIterator Algumas das operações com monitoração de estado (como processamento de tempo limite ou flatMapGroupsWithState marca d'água em agregações em janela) exigem a leitura de dados inteiros no banco de dados por meio do iterador. O tamanho total dos dados não compactados lidos usando o iterador.