Configurare l'archivio di stato RocksDB su Azure Databricks
È possibile abilitare la gestione dello stato basata su RocksDB impostando la configurazione seguente in SparkSession prima di avviare la query di streaming.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
È possibile abilitare RocksDB nelle pipeline DLT. Vedere Ottimizzare la configurazione della pipeline per l'elaborazione con stato.
Abilitare il punto di controllo del log delle modifiche
In Databricks Runtime 13.3 LTS e versioni successive, è possibile abilitare il checkpoint del registro delle modifiche per ridurre la durata del checkpoint e la latenza end-to-end per i carichi di lavoro di Structured Streaming. Databricks raccomanda di abilitare il checkpoint del log delle modifiche per tutte le query con stato di Structured Streaming.
Tradizionalmente, gli snapshot dell'archivio stati di RocksDB vengono effettuati e i file di dati vengono caricati durante il checkpoint. Per evitare questo costo, il checkpoint del log delle modifiche scrive nella memoria persistente solo i record che sono stati modificati dall'ultimo checkpoint.
Il checkpoint del log delle modifiche è disabilitato per impostazione predefinita. È possibile abilitare il checkpoint del changelog a livello di SparkSession usando la sintassi seguente:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
È possibile abilitare il checkpoint del changelog in un flusso esistente e mantenere le informazioni sullo stato archiviate nel checkpoint.
Importante
Le query che hanno abilitato il checkpoint del log delle modifiche possono essere eseguite solo in Databricks Runtime 13.3 LTS e versioni successive. È possibile disabilitare il checkpoint del log delle modifiche per ripristinare il comportamento di checkpoint legacy, ma è necessario continuare a eseguire queste query in Databricks Runtime 13.3 LTS o versione successiva. È necessario riavviare il processo affinché queste modifiche vengano apportate.
Metriche dell'archivio stati di RocksDB
Ogni operatore di stato raccoglie le metriche correlate alle operazioni di gestione dello stato eseguite nell'istanza di RocksDB per osservare l'archivio stati e potenzialmente contribuire al debug della lentezza del processo.
Le metriche per un'istanza specifica di un archivio di stato sono etichettate con l'ID partizione e il nome dell'archivio, garantendo che rimangano separate. Tutte le altre metriche vengono aggregate (somma) per ogni operatore di stato nel processo in tutte le attività in cui è in esecuzione l'operatore di stato.
Queste metriche fanno parte della customMetrics
mappa all'interno dei stateOperators
campi in StreamingQueryProgress
. Di seguito è riportato un esempio di StreamingQueryProgress
in formato JSON (ottenuto usando StreamingQueryProgress.json()
).
{
"id": "6774075e-8869-454b-ad51-513be86cfd43",
"runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId": 7,
"stateOperators": [
{
"numRowsTotal": 20000000,
"numRowsUpdated": 20000000,
"memoryUsedBytes": 31005397,
"numRowsDroppedByWatermark": 0,
"customMetrics": {
"SnapshotLastUploaded.partition_0_default": 7,
"SnapshotLastUploaded.partition_1_default": 7,
"SnapshotLastUploaded.partition_2_default": 6,
"SnapshotLastUploaded.partition_3_default": 6,
"SnapshotLastUploaded.partition_4_default": -1,
"rocksdbBytesCopied": 141037747,
"rocksdbCommitCheckpointLatency": 2,
"rocksdbCommitCompactLatency": 22061,
"rocksdbCommitFileSyncLatencyMs": 1710,
"rocksdbCommitFlushLatency": 19032,
"rocksdbCommitPauseLatency": 0,
"rocksdbCommitWriteBatchLatency": 56155,
"rocksdbFilesCopied": 2,
"rocksdbFilesReused": 0,
"rocksdbGetCount": 40000000,
"rocksdbGetLatency": 21834,
"rocksdbPutCount": 1,
"rocksdbPutLatency": 56155599000,
"rocksdbReadBlockCacheHitCount": 1988,
"rocksdbReadBlockCacheMissCount": 40341617,
"rocksdbSstFileSize": 141037747,
"rocksdbTotalBytesReadByCompaction": 336853375,
"rocksdbTotalBytesReadByGet": 680000000,
"rocksdbTotalBytesReadThroughIterator": 0,
"rocksdbTotalBytesWrittenByCompaction": 141037747,
"rocksdbTotalBytesWrittenByPut": 740000012,
"rocksdbTotalCompactionLatencyMs": 21949695000,
"rocksdbWriterStallLatencyMs": 0,
"rocksdbZipFileBytesUncompressed": 7038
}
}
],
"sources": [{}],
"sink": {}
}
Le descrizioni dettagliate delle metriche sono le seguenti:
Nome della metrica | Descrizione |
---|---|
rocksdbCommitWriteBatchLatency | Tempo (in millis) richiesto per applicare le scritture provvisorie nella struttura in memoria (WriteBatch) al RocksDB nativo. |
rocksdbCommitFlushLatency | Tempo (in millis) impiegato per scaricare le modifiche in memoria di RocksDB nel disco locale. |
rocksdbCommitCompactLatency | Tempo impiegato (in millisecondi, se facoltativo) per la compattazione durante il commit del checkpoint. |
rocksdbCommitPauseLatency | Tempo (in millisecondi) impiegato per arrestare i thread di elaborazione in background (per la compattazione e così via) nell'ambito del commit del checkpoint. |
rocksdbCommitCheckpointLatency | Tempo (in millisecondi) necessario per effettuare uno snapshot del RocksDB nativo e scriverlo in una directory locale. |
rocksdbCommitFileSyncLatencyMs | Tempo (in millisecondi) impiegato per la sincronizzazione dei file relativi allo snapshot nativo di RocksDB in un'archiviazione esterna (posizione del checkpoint). |
rocksdbGetLatency | Tempo medio (in nano) richiesto per la chiamata nativa RocksDB::Get sottostante. |
rocksdbPutCount | Tempo medio (in nano) richiesto per la chiamata nativa RocksDB::Put sottostante. |
rocksdbGetCount | Numero di chiamate native RocksDB::Get (non include Gets writeBatch - batch in memoria usato per le scritture di Staging). |
rocksdbPutCount | Numero di chiamate native RocksDB::Put (non include Puts in batch di memoria WriteBatch usato per le scritture di staging). |
rocksdbTotalBytesReadByGet | Numero di byte non compressi letti attraverso chiamate native RocksDB::Get . |
rocksdbTotalBytesWrittenByPut (totale byte scritti da Put in rocksdb) | Numero di byte non compressi scritti tramite chiamate native RocksDB::Put . |
rocksdbReadBlockCacheHitCount | Numero di volte in cui viene usata la cache a blocchi di RocksDB nativa per evitare la lettura dei dati dal disco locale. |
ConteggioMancanzaCacheLetturaRocksdb | Numero di volte in cui la cache nativa del blocco di RocksDB ha mancato e ha richiesto la lettura dei dati dal disco locale. |
Bytes totali letti dalla compattazione di rocksdb | Numero di byte letti dal disco locale dal processo di compattazione di RocksDB nativo. |
rocksdbTotalBytesWrittenByCompaction | Numero di byte scritti nel disco locale dal processo di compattazione di RocksDB nativo. |
rocksdbTotalCompactionLatencyMs | Tempo (in millisecondi) impiegato per le compattazioni di RocksDB (sia quelle in background che quelle facoltative avviate durante il commit). |
rocksdbWriterStallLatencyMs | Tempo (in millisecondi) il processo di scrittura si è bloccato a causa di una compattazione in background o dello svuotamento delle memtable su disco. |
rocksdbTotalBytesReadThroughIterator | Alcune delle operazioni con stato (ad esempio l'elaborazione del timeout o la gestione dei watermark nelle aggregazioni di finestre di flatMapGroupsWithState ) richiedono la lettura dell'intero set di dati nel database tramite iteratore. Dimensioni totali dei dati non compressi letti usando l'iteratore. |