Dela via


Konfigurera RocksDB-tillståndslager i Azure Databricks

Du kan aktivera RocksDB-baserad tillståndshantering genom att ange följande konfiguration i SparkSession innan du startar strömningsfrågan.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

Du kan aktivera RocksDB på Delta Live Tables pipelines. Se Optimize pipeline-konfiguration för tillståndskänslig bearbetning.

Aktivera kontrollpunkter för ändringsloggar

I Databricks Runtime 13.3 LTS och senare kan du aktivera kontrollpunkter för ändringsloggar till lägre varaktighet för kontrollpunkter och svarstid från slutpunkt till slutpunkt för strukturerade strömningsarbetsbelastningar. Databricks rekommenderar att du aktiverar kontrollpunkter för ändringslogg för alla tillståndsfrågor i Structured Streaming.

Traditionellt RocksDB State Store ögonblicksbilder och laddar upp datafiler under kontrollpunkter. För att undvika den här kostnaden skriver ändringsloggkontrollpunkter endast poster som har ändrats sedan den senaste kontrollpunkten till varaktig lagring."

Kontrollpunkter för ändringsloggar är inaktiverade som standard. Du kan aktivera kontrollpunkter för ändringsloggar på SparkSession-nivån med hjälp av följande syntax:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

Du kan aktivera kontrollpunkter för ändringsloggar i en befintlig ström och underhålla tillståndsinformationen som lagras i kontrollpunkten.

Viktigt!

Frågor som har aktiverat kontrollpunkter för ändringsloggar kan bara köras på Databricks Runtime 13.3 LTS och senare. Du kan inaktivera kontrollpunkter för ändringsloggar för att återgå till äldre kontrollpunktsbeteende, men du måste fortsätta att köra dessa frågor på Databricks Runtime 13.3 LTS eller senare. Du måste starta om jobbet för att ändringarna ska kunna genomföras.

Mått för RocksDB-tillståndslager

Varje tillståndsoperator samlar in mått relaterade till de tillståndshanteringsåtgärder som utförs på dess RocksDB-instans för att observera tillståndsarkivet och potentiellt hjälpa till med felsökning av jobb långsammare. Dessa mått aggregeras (summa) per tillståndsoperator inom jobbet för alla uppgifter where där tillståndsoperatorn körs. Dessa mått är en del av kartan customMetrics i fälten stateOperators i StreamingQueryProgress. Följande är ett exempel på StreamingQueryProgress i JSON-formulär (hämtas med ).StreamingQueryProgress.json()

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Detaljerade beskrivningar av måtten är följande:

Måttnamn beskrivning
rocksdbCommitWriteBatchLatency Tiden (i millis) tog för att tillämpa mellanlagrade skrivningar i minnesintern struktur (WriteBatch) på interna RocksDB.
rocksdbCommitFlushLatency Tiden (i millis) tog för att rensa RocksDB-minnesinterna ändringar till den lokala disken.
rocksdbCommitCompactLatency Tiden (i millis) tog för komprimering (valfritt) under kontrollpunktsincheckningen.
rocksdbCommitPauseLatency Tiden (i millis) tog för att stoppa bakgrundsarbetartrådarna (för komprimering osv.) som en del av kontrollpunktsincheckningen.
rocksdbCommitCheckpointLatency Tiden (i millis) tog för att ta en ögonblicksbild av den interna RocksDB och skriva den till en lokal katalog.
rocksdbCommitFileSyncLatencyMs Det tog tid (i millis) att synkronisera de inbyggda RocksDB-ögonblicksbildsrelaterade filerna till en extern lagringsplats (kontrollpunktsplats).
rocksdbGetLatency Genomsnittlig tid (i nanos) tog per det underliggande inbyggda RocksDB::Get anropet.
rocksdbPutCount Genomsnittlig tid (i nanos) tog per det underliggande inbyggda RocksDB::Put anropet.
rocksdbGetCount Antal interna RocksDB::Get anrop (inkluderar Gets inte från WriteBatch – i minnesbatch som används för mellanlagring av skrivningar).
rocksdbPutCount Antal interna RocksDB::Put anrop (inkluderar Puts inte WriteBatch – i minnesbatch som används för mellanlagring av skrivningar).
rocksdbTotalBytesReadByGet Antal okomprimerade byte som läses via interna RocksDB::Get anrop.
rocksdbTotalBytesWrittenByPut Antal okomprimerade byte som skrivits via interna RocksDB::Put anrop.
rocksdbReadBlockCacheHitCount Antal gånger som det interna RocksDB-blockcacheminnet används för att undvika att läsa data från en lokal disk.
rocksdbReadBlockCacheMissCount Antal gånger det interna RocksDB-blockcacheminnet missade och krävde läsning av data från den lokala disken.
rocksdbTotalBytesReadByCompaction Antal byte som lästs från den lokala disken av den interna RocksDB-komprimeringsprocessen.
rocksdbTotalBytesWrittenByCompaction Antal byte som skrivits till den lokala disken av den interna RocksDB-komprimeringsprocessen.
rocksdbTotalCompactionLatencyMs Tiden (i millis) tog för RocksDB-komprimering (både bakgrund och den valfria komprimering som initierades under incheckningen).
rocksdbWriterStallLatencyMs Tid (i millis) skrivaren har stannat på grund av en bakgrundskomprimering eller tömning av memtables till disk.
rocksdbTotalBytesReadThroughIterator Vissa tillståndskänsliga åtgärder (till exempel timeoutbearbetning i flatMapGroupsWithState eller vattenstämpling i fönsteraggregeringar) kräver att du läser hela data i DB via iteratorn. Den totala storleken på okomprimerade data som lästs med iteratorn.