Condividi tramite


Configurare l'archivio di stato RocksDB su Azure Databricks

È possibile abilitare la gestione dello stato basata su RocksDB impostando la configurazione seguente in SparkSession prima di avviare la query di streaming.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

È possibile abilitare RocksDB nelle pipeline DLT. Vedere Ottimizzare la configurazione della pipeline per l'elaborazione con stato.

Abilitare il punto di controllo del log delle modifiche

In Databricks Runtime 13.3 LTS e versioni successive, è possibile abilitare il checkpoint del registro delle modifiche per ridurre la durata del checkpoint e la latenza end-to-end per i carichi di lavoro di Structured Streaming. Databricks raccomanda di abilitare il checkpoint del log delle modifiche per tutte le query con stato di Structured Streaming.

Tradizionalmente, gli snapshot dell'archivio stati di RocksDB vengono effettuati e i file di dati vengono caricati durante il checkpoint. Per evitare questo costo, il checkpoint del log delle modifiche scrive nella memoria persistente solo i record che sono stati modificati dall'ultimo checkpoint.

Il checkpoint del log delle modifiche è disabilitato per impostazione predefinita. È possibile abilitare il checkpoint del changelog a livello di SparkSession usando la sintassi seguente:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

È possibile abilitare il checkpoint del changelog in un flusso esistente e mantenere le informazioni sullo stato archiviate nel checkpoint.

Importante

Le query che hanno abilitato il checkpoint del log delle modifiche possono essere eseguite solo in Databricks Runtime 13.3 LTS e versioni successive. È possibile disabilitare il checkpoint del log delle modifiche per ripristinare il comportamento di checkpoint legacy, ma è necessario continuare a eseguire queste query in Databricks Runtime 13.3 LTS o versione successiva. È necessario riavviare il processo affinché queste modifiche vengano apportate.

Metriche dell'archivio stati di RocksDB

Ogni operatore di stato raccoglie le metriche correlate alle operazioni di gestione dello stato eseguite nell'istanza di RocksDB per osservare l'archivio stati e potenzialmente contribuire al debug della lentezza del processo.

Le metriche per un'istanza specifica di un archivio di stato sono etichettate con l'ID partizione e il nome dell'archivio, garantendo che rimangano separate. Tutte le altre metriche vengono aggregate (somma) per ogni operatore di stato nel processo in tutte le attività in cui è in esecuzione l'operatore di stato.

Queste metriche fanno parte della customMetrics mappa all'interno dei stateOperators campi in StreamingQueryProgress. Di seguito è riportato un esempio di StreamingQueryProgress in formato JSON (ottenuto usando StreamingQueryProgress.json()).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Le descrizioni dettagliate delle metriche sono le seguenti:

Nome della metrica Descrizione
rocksdbCommitWriteBatchLatency Tempo (in millis) richiesto per applicare le scritture provvisorie nella struttura in memoria (WriteBatch) al RocksDB nativo.
rocksdbCommitFlushLatency Tempo (in millis) impiegato per scaricare le modifiche in memoria di RocksDB nel disco locale.
rocksdbCommitCompactLatency Tempo impiegato (in millisecondi, se facoltativo) per la compattazione durante il commit del checkpoint.
rocksdbCommitPauseLatency Tempo (in millisecondi) impiegato per arrestare i thread di elaborazione in background (per la compattazione e così via) nell'ambito del commit del checkpoint.
rocksdbCommitCheckpointLatency Tempo (in millisecondi) necessario per effettuare uno snapshot del RocksDB nativo e scriverlo in una directory locale.
rocksdbCommitFileSyncLatencyMs Tempo (in millisecondi) impiegato per la sincronizzazione dei file relativi allo snapshot nativo di RocksDB in un'archiviazione esterna (posizione del checkpoint).
rocksdbGetLatency Tempo medio (in nano) richiesto per la chiamata nativa RocksDB::Get sottostante.
rocksdbPutCount Tempo medio (in nano) richiesto per la chiamata nativa RocksDB::Put sottostante.
rocksdbGetCount Numero di chiamate native RocksDB::Get (non include Gets writeBatch - batch in memoria usato per le scritture di Staging).
rocksdbPutCount Numero di chiamate native RocksDB::Put (non include Puts in batch di memoria WriteBatch usato per le scritture di staging).
rocksdbTotalBytesReadByGet Numero di byte non compressi letti attraverso chiamate native RocksDB::Get.
rocksdbTotalBytesWrittenByPut (totale byte scritti da Put in rocksdb) Numero di byte non compressi scritti tramite chiamate native RocksDB::Put .
rocksdbReadBlockCacheHitCount Numero di volte in cui viene usata la cache a blocchi di RocksDB nativa per evitare la lettura dei dati dal disco locale.
ConteggioMancanzaCacheLetturaRocksdb Numero di volte in cui la cache nativa del blocco di RocksDB ha mancato e ha richiesto la lettura dei dati dal disco locale.
Bytes totali letti dalla compattazione di rocksdb Numero di byte letti dal disco locale dal processo di compattazione di RocksDB nativo.
rocksdbTotalBytesWrittenByCompaction Numero di byte scritti nel disco locale dal processo di compattazione di RocksDB nativo.
rocksdbTotalCompactionLatencyMs Tempo (in millisecondi) impiegato per le compattazioni di RocksDB (sia quelle in background che quelle facoltative avviate durante il commit).
rocksdbWriterStallLatencyMs Tempo (in millisecondi) il processo di scrittura si è bloccato a causa di una compattazione in background o dello svuotamento delle memtable su disco.
rocksdbTotalBytesReadThroughIterator Alcune delle operazioni con stato (ad esempio l'elaborazione del timeout o la gestione dei watermark nelle aggregazioni di finestre di flatMapGroupsWithState) richiedono la lettura dell'intero set di dati nel database tramite iteratore. Dimensioni totali dei dati non compressi letti usando l'iteratore.