Sdílet prostřednictvím


Konfigurace úložiště stavů RocksDB v Azure Databricks

Správu stavu na základě RocksDB můžete povolit nastavením následující konfigurace v SparkSession před spuštěním dotazu streamování.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

RocksDB můžete povolit v kanálech DLT. Vizte Optimalizujte konfiguraci pipeliny pro stavové zpracování.

Povolení vytváření kontrolních bodů protokolu změn

Ve službě Databricks Runtime 13.3 LTS a novějších můžete povolit kontrolní body protokolu změn, aby se snížila doba trvání kontrolního bodu a celková latence pro úlohy strukturovaného streamování. Databricks doporučuje povolit kontrolu verzí pro všechny stavové dotazy v rámci strukturovaného streamování.

Tradičně RocksDB State Store vytváří snímky a nahrává datové soubory během provádění kontrolních bodů. Pokud se chcete těmto nákladům vyhnout, kontrolní body záznamu změn zapisují do trvanlivého úložiště pouze záznamy, které se změnily od posledního kontrolního bodu.

Ve výchozím nastavení je vytváření kontrolních bodů protokolu změn zakázáno. Kontrolní body protokolu změn můžete povolit na úrovni SparkSession pomocí následující syntaxe:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

U existujícího datového proudu můžete povolit kontrolní body protokolu změn a udržovat informace o stavu uložené v kontrolním bodu.

Důležité

Dotazy, kde je zapnuto vytváření kontrolních bodů protokolu změn, lze spouštět pouze na Databricks Runtime 13.3 LTS a novějších verzích. Pokud zakážete vytváření kontrolních bodů changelogu, čímž se vrátíte ke staršímu stylu kontrolních bodů, musíte i nadále spouštět tyto dotazy v Databricks Runtime 13.3 LTS nebo vyšší. Aby se tyto změny projevily, musíte úlohu restartovat.

Metriky úložiště stavu RocksDB

Každý stavový operátor shromažďuje metriky spojené s operacemi správy stavu prováděnými na jeho instanci RocksDB, aby bylo možné sledovat úložiště stavu a potenciálně pomoci s laděním pomalosti úloh.

Metriky pro konkrétní instanci stavového úložiště jsou označeny ID oddílu a názvem úložiště, aby zůstaly oddělené. Všechny ostatní metriky jsou agregovány jako součet pro každého operátora stavu v úloze napříč všemi úkoly, ve kterých je operátor stavu aktivní.

Tyto metriky jsou součástí customMetrics mapy uvnitř stateOperators polí v StreamingQueryProgress. Následuje příklad StreamingQueryProgress ve formátu JSON (získaném pomocí StreamingQueryProgress.json()).

{
  "id": "6774075e-8869-454b-ad51-513be86cfd43",
  "runId": "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId": 7,
  "stateOperators": [
    {
      "numRowsTotal": 20000000,
      "numRowsUpdated": 20000000,
      "memoryUsedBytes": 31005397,
      "numRowsDroppedByWatermark": 0,
      "customMetrics": {
        "SnapshotLastUploaded.partition_0_default": 7,
        "SnapshotLastUploaded.partition_1_default": 7,
        "SnapshotLastUploaded.partition_2_default": 6,
        "SnapshotLastUploaded.partition_3_default": 6,
        "SnapshotLastUploaded.partition_4_default": -1,
        "rocksdbBytesCopied": 141037747,
        "rocksdbCommitCheckpointLatency": 2,
        "rocksdbCommitCompactLatency": 22061,
        "rocksdbCommitFileSyncLatencyMs": 1710,
        "rocksdbCommitFlushLatency": 19032,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 56155,
        "rocksdbFilesCopied": 2,
        "rocksdbFilesReused": 0,
        "rocksdbGetCount": 40000000,
        "rocksdbGetLatency": 21834,
        "rocksdbPutCount": 1,
        "rocksdbPutLatency": 56155599000,
        "rocksdbReadBlockCacheHitCount": 1988,
        "rocksdbReadBlockCacheMissCount": 40341617,
        "rocksdbSstFileSize": 141037747,
        "rocksdbTotalBytesReadByCompaction": 336853375,
        "rocksdbTotalBytesReadByGet": 680000000,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWrittenByCompaction": 141037747,
        "rocksdbTotalBytesWrittenByPut": 740000012,
        "rocksdbTotalCompactionLatencyMs": 21949695000,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 7038
      }
    }
  ],
  "sources": [{}],
  "sink": {}
}

Podrobné popisy metrik jsou následující:

Název metriky Popis
rocksdbCommitWriteBatchLatency Doba (v milisekundách) potřebná k použití fázovaných zápisů do struktury v paměti (WriteBatch) do nativní RocksDB.
rocksdbCommitFlushLatency Čas (v milis) trvalo vyprázdnění změn v paměti RocksDB na místní disk.
rocksdbCommitCompactLatency Čas (v milisekundách) potřebný pro kompakci (volitelné) během potvrzení kontrolního bodu.
rocksdbCommitPauseLatency Čas (v milisekundách) potřebný k zastavení pracovních vláken na pozadí (například pro optimalizaci komprese) během potvrzení kontrolního bodu.
LatenceCommituKontrolníhoBoduRocksDB Čas (v milisekundách) potřebný na pořízení snímku nativního RocksDB a jeho zápis do místního adresáře.
rocksdbCommitFileSyncLatencyMs Čas (v milisekundách), který byl potřeba k synchronizaci nativních souborů spojených se snímky RocksDB na externí úložiště, místo kontrolního bodu.
rocksdbGetLatency Průměrná doba (v nanosekundách) připadala na základní nativní RocksDB::Get volání.
rocksdbPutCount Průměrná doba (v nanosekundách), kterou trvala základní nativní RocksDB::Put volání.
rocksdbGetCount Počet nativních RocksDB::Get volání (nezahrnuje Gets z WriteBatch - v paměťovém bloku používaném pro přípravu zápisů).
rocksdbPutCount Počet nativních volání RocksDB::Put (nezahrnuje volání Puts do WriteBatch – dávka v paměti používaná pro přípravu zápisů).
rocksdbCelkovýPočetBajtůPřečtenýchGetem Počet nekomprimovaných bajtů přečtených prostřednictvím nativních RocksDB::Get volání.
rocksdbCelkemBajtůZapsaných"Put" Počet nekomprimovaných bajtů zapsaných prostřednictvím nativních RocksDB::Put volání
rocksdbReadBlockCacheHitCount Počet použití nativní mezipaměti bloků RocksDB k tomu, aby se zabránilo čtení dat z místního disku.
rocksdbPočetChybČteníBlokuMezipaměti Počet případů, kdy chyběla data v nativní blokové mezipaměti RocksDB a bylo nutné číst data z místního disku.
rocksdbTotalBytesReadByCompaction Počet bajtů načtených z místního disku nativním procesem komprimace RocksDB
rocksdbCelkemBajtůNapsanýchKompakcí Počet bajtů zapsaných na místní disk nativním procesem komprimace RocksDB
rocksdbCelkováLatenceKompakceMs Doba (v milisekundách), kterou trvala komprimace RocksDB (jak na pozadí, tak i volitelná komprimace, která byla zahájena během potvrzování).
rocksdbWriterStallLatencyMs Čas (v milisekundách), kdy se zapisovač zastavil kvůli kompresi na pozadí nebo vyprázdnění memtables na disk.
rocksdbTotalBytesReadThroughIterator Některé stavové operace (například zpracování časového limitu v flatMapGroupsWithState nebo vodoznakování v agregacích s okny) vyžadují čtení veškerých dat v databázi prostřednictvím iterátoru. Celková velikost nekomprimovaných dat přečtených pomocí iterátoru.