Sdílet prostřednictvím


Konfigurace úložiště stavů RocksDB v Azure Databricks

Správu stavu na základě RocksDB můžete povolit nastavením následující konfigurace v SparkSession před spuštěním dotazu streamování.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Můžete povolit RocksDB na pipelinech Delta Live Tables. Prohlédněte si konfiguraci potrubí Optimize pro stavové zpracování.

Povolení vytváření kontrolních bodů protokolu změn

Ve službě Databricks Runtime 13.3 LTS a novějších můžete povolit kontrolní body protokolu změn, aby se snížila doba trvání kontrolního bodu a celková latence pro úlohy strukturovaného streamování. Databricks doporučuje zapnout kontrolní body protokolu změn pro všechny stavové dotazy strukturovaného streamování.

Tradičně rocksDB State Store snímky a nahrává datové soubory během vytváření kontrolních bodů. Pokud se chcete těmto nákladům vyhnout, kontrolní body protokolu změn zapisují pouze záznamy, které se změnily od posledního kontrolního bodu do odolného úložiště.

Vytváření kontrolních bodů protokolu změn je ve výchozím nastavení zakázané. Kontrolní body protokolu změn můžete povolit na úrovni SparkSession pomocí následující syntaxe:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

U existujícího datového proudu můžete povolit kontrolní body protokolu změn a zachovat informace o stavu uložené v kontrolním bodu.

Důležité

Dotazy, které povolily vytváření kontrolních bodů protokolu změn, je možné spouštět pouze v Databricks Runtime 13.3 LTS a vyšší. Pokud se chcete vrátit ke staršímu chování kontrolních bodů, můžete zakázat vytváření kontrolních bodů protokolu změn, ale tyto dotazy musíte dál spouštět ve službě Databricks Runtime 13.3 LTS nebo vyšší. Aby se tyto změny projevily, musíte úlohu restartovat.

Metriky úložiště stavu RocksDB

Každý operátor stavu shromažďuje metriky související s operacemi správy stavu provedenými ve své instanci RocksDB, aby bylo možné sledovat úložiště stavů a potenciálně pomoct s zpomalením úlohy ladění. Tyto metriky se sčítají pro každý operátor stavu v úloze napříč všemi úkoly, kde where je operátor stavu spuštěný. Tyto metriky jsou součástí customMetrics mapy uvnitř stateOperators polí v StreamingQueryProgress. Následuje příklad StreamingQueryProgress ve formátu JSON (získaném pomocí StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Podrobné popisy metrik jsou následující:

Název metriky Popis
rocksdbCommitWriteBatchLatency Doba (v milis) trvala použití fázovaných zápisů ve struktuře v paměti (WriteBatch) na nativní RocksDB.
rocksdbCommitFlushLatency Čas (v milis) trvalo vyprázdnění změn v paměti RocksDB na místní disk.
rocksdbCommitCompactLatency Čas (v milis) trvalo komprimace (volitelné) během potvrzení kontrolního bodu.
rocksdbCommitPauseLatency Čas (v milis) trvalo zastavení pracovních vláken na pozadí (pro komprimace atd.) v rámci potvrzení kontrolního bodu.
rocksdbCommitCheckpointLatency Čas (v milis) trvalo pořízení snímku nativní rocksDB a jeho zápis do místního adresáře.
rocksdbCommitFileSyncLatencyMs Čas (v milis) trvalo synchronizaci nativních souborů souvisejících se snímky RocksDB s externím úložištěm (umístění kontrolního bodu).
rocksdbGetLatency Průměrná doba (v nanos) trvala podle základního nativního RocksDB::Get volání.
rocksdbPutCount Průměrná doba (v nanos) trvala podle základního nativního RocksDB::Put volání.
rocksdbGetCount Počet nativních RocksDB::Get volání (nezahrnuje Gets WriteBatch – v dávce paměti používané pro přípravné zápisy).
rocksdbPutCount Počet nativních RocksDB::Put volání (nezahrnuje Puts WriteBatch – v dávce paměti používané pro přípravné zápisy).
rocksdbTotalBytesReadByGet Počet nekomprimovaných bajtů přečtených prostřednictvím nativních RocksDB::Get volání.
rocksdbTotalBytesWrittenByPut Počet nekomprimovaných bajtů zapsaných prostřednictvím nativních RocksDB::Put volání
rocksdbReadBlockCacheHitCount Počet použití nativní mezipaměti bloků RocksDB k tomu, aby se zabránilo čtení dat z místního disku.
rocksdbReadBlockCacheMissCount Kolikrát nativní blok RocksDB zmeškala mezipaměť a vyžadovala čtení dat z místního disku.
rocksdbTotalBytesReadByCompaction Počet bajtů načtených z místního disku nativním procesem komprimace RocksDB
rocksdbTotalBytesWrittenByCompaction Počet bajtů zapsaných na místní disk nativním procesem komprimace RocksDB
rocksdbTotalCompactionLatencyMs Čas (v milis) trvalo komprimace RocksDB (pozadí i volitelné komprimace zahájené během potvrzení).
rocksdbWriterStallLatencyMs Čas (v milis) zapisovač se zastavil kvůli komprimování pozadí nebo vyprázdnění memtables na disk.
rocksdbTotalBytesReadThroughIterator Některé stavové operace (například zpracování časového limitu v flatMapGroupsWithState agregaci s okny nebo vodoznaky) vyžadují čtení celých dat v databázi prostřednictvím iterátoru. Celková velikost nekomprimovaných dat přečtených pomocí iterátoru.